Skip to content

Commit fb31ab0

Browse files
SNOW-3011964 Add test scenario confirming timestamp conversion quirks + refactor test
1 parent d557dce commit fb31ab0

File tree

10 files changed

+291
-148
lines changed

10 files changed

+291
-148
lines changed

src/test/java/com/snowflake/kafka/connector/ConnectClusterBaseIT.java

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,57 @@
11
package com.snowflake.kafka.connector;
22

3+
import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.SNOWFLAKE_TOPICS2TABLE_MAP;
34
import static org.awaitility.Awaitility.await;
45

6+
import com.fasterxml.jackson.databind.ObjectMapper;
57
import com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams;
8+
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
9+
import com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory;
610
import com.snowflake.kafka.connector.internal.TestUtils;
711
import com.snowflake.kafka.connector.internal.streaming.FakeIngestClientSupplier;
812
import com.snowflake.kafka.connector.internal.streaming.FakeSnowflakeStreamingIngestClient;
13+
import com.snowflake.kafka.connector.internal.streaming.v2.StreamingClientManager;
14+
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
15+
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
16+
import io.confluent.kafka.serializers.KafkaAvroSerializer;
917
import java.time.Duration;
1018
import java.util.HashMap;
1119
import java.util.Map;
20+
import java.util.Properties;
21+
import org.apache.kafka.clients.producer.KafkaProducer;
22+
import org.apache.kafka.clients.producer.ProducerConfig;
23+
import org.apache.kafka.common.serialization.StringSerializer;
1224
import org.apache.kafka.connect.json.JsonConverter;
1325
import org.apache.kafka.connect.runtime.ConnectorConfig;
1426
import org.apache.kafka.connect.sink.SinkConnector;
1527
import org.apache.kafka.connect.storage.StringConverter;
1628
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
1729
import org.junit.jupiter.api.AfterAll;
30+
import org.junit.jupiter.api.AfterEach;
1831
import org.junit.jupiter.api.BeforeAll;
32+
import org.junit.jupiter.api.BeforeEach;
1933
import org.junit.jupiter.api.TestInstance;
2034

2135
/** Base class for integration tests using an embedded Kafka Connect cluster. */
2236
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
2337
public abstract class ConnectClusterBaseIT {
2438

25-
protected EmbeddedConnectCluster connectCluster;
39+
static final String MOCK_SCHEMA_REGISTRY_URL = "mock://test-schema-registry";
40+
static final String SCHEMA_REGISTRY_SCOPE = "test-schema-registry";
41+
static final int PARTITION_COUNT = 1;
42+
static final int RECORD_COUNT = 100;
43+
static final int TOPIC_COUNT = 2;
44+
static final Integer TASK_NUMBER = 1;
45+
2646
protected final FakeIngestClientSupplier fakeClientSupplier = new FakeIngestClientSupplier();
47+
protected final ObjectMapper objectMapper = new ObjectMapper();
2748

28-
static final Integer TASK_NUMBER = 1;
49+
protected String tableName;
50+
protected String connectorName;
51+
protected String topic0;
52+
protected String topic1;
53+
protected EmbeddedConnectCluster connectCluster;
54+
protected SnowflakeConnectionService snowflake;
2955

3056
@BeforeAll
3157
public void beforeAll() {
@@ -51,6 +77,34 @@ public void afterAll() {
5177
}
5278
}
5379

80+
@BeforeEach
81+
void before() {
82+
83+
tableName = TestUtils.randomTableName();
84+
connectorName = String.format("%s_connector", tableName);
85+
topic0 = tableName + "0";
86+
topic1 = tableName + "1";
87+
connectCluster.kafka().createTopic(topic0, PARTITION_COUNT);
88+
connectCluster.kafka().createTopic(topic1, PARTITION_COUNT);
89+
snowflake =
90+
SnowflakeConnectionServiceFactory.builder()
91+
.setProperties(TestUtils.transformProfileFileToConnectorConfiguration(false))
92+
.noCaching()
93+
.build();
94+
95+
StreamingClientManager.resetIngestClientSupplier();
96+
}
97+
98+
@AfterEach
99+
void after() {
100+
connectCluster.kafka().deleteTopic(topic0);
101+
connectCluster.kafka().deleteTopic(topic1);
102+
connectCluster.deleteConnector(connectorName);
103+
StreamingClientManager.resetIngestClientSupplier();
104+
TestUtils.dropTable(tableName);
105+
MockSchemaRegistry.dropScope(SCHEMA_REGISTRY_SCOPE);
106+
}
107+
54108
protected FakeSnowflakeStreamingIngestClient getOpenedFakeIngestClient(String connectorName) {
55109
await("channelsCreated")
56110
.atMost(Duration.ofSeconds(60))
@@ -115,6 +169,44 @@ protected final void waitForConnectorStopped(String connectorName) {
115169
}
116170
}
117171

172+
protected KafkaProducer<String, Object> createAvroProducer() {
173+
final Properties props = new Properties();
174+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, connectCluster.kafka().bootstrapServers());
175+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
176+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
177+
props.put("schema.registry.url", MOCK_SCHEMA_REGISTRY_URL);
178+
return new KafkaProducer<>(props, new StringSerializer(), createAvroSerializer());
179+
}
180+
181+
protected KafkaAvroSerializer createAvroSerializer() {
182+
final SchemaRegistryClient schemaRegistryClient =
183+
MockSchemaRegistry.getClientForScope(SCHEMA_REGISTRY_SCOPE);
184+
final KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaRegistryClient);
185+
serializer.configure(Map.of("schema.registry.url", MOCK_SCHEMA_REGISTRY_URL), false);
186+
return serializer;
187+
}
188+
189+
protected Map<String, String> createConnectorConfig() {
190+
final String topics = topic0 + "," + topic1;
191+
final String topicsToTableMap = topic0 + ":" + tableName + "," + topic1 + ":" + tableName;
192+
193+
final Map<String, String> config = defaultProperties(topics, connectorName);
194+
config.put(SNOWFLAKE_TOPICS2TABLE_MAP, topicsToTableMap);
195+
config.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
196+
config.put("value.converter.schemas.enable", "false");
197+
config.put("errors.tolerance", "none");
198+
config.put("errors.log.enable", "true");
199+
config.put("errors.deadletterqueue.topic.name", "DLQ_TOPIC");
200+
config.put("errors.deadletterqueue.topic.replication.factor", "1");
201+
config.put("jmx", "true");
202+
return config;
203+
}
204+
205+
void sendTombstoneRecords(final String topic) {
206+
// Send null tombstone
207+
connectCluster.kafka().produce(topic, null);
208+
}
209+
118210
private FakeSnowflakeStreamingIngestClient getFakeSnowflakeStreamingIngestClient(
119211
String connectorName) {
120212
return fakeClientSupplier.getFakeIngestClients().stream()
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package com.snowflake.kafka.connector;
2+
3+
import static com.snowflake.kafka.connector.internal.TestUtils.assertWithRetry;
4+
import static com.snowflake.kafka.connector.internal.TestUtils.getTableContentOneRow;
5+
import static com.snowflake.kafka.connector.internal.TestUtils.loadClasspathResource;
6+
import static org.assertj.core.api.Assertions.assertThat;
7+
8+
import com.snowflake.kafka.connector.internal.TestUtils;
9+
import io.confluent.connect.avro.AvroConverter;
10+
import java.sql.Timestamp;
11+
import java.util.Map;
12+
import org.apache.avro.Schema;
13+
import org.apache.avro.generic.GenericData;
14+
import org.apache.avro.generic.GenericRecord;
15+
import org.apache.kafka.clients.producer.KafkaProducer;
16+
import org.apache.kafka.clients.producer.ProducerRecord;
17+
import org.apache.kafka.connect.runtime.ConnectorConfig;
18+
import org.junit.jupiter.api.AfterEach;
19+
import org.junit.jupiter.api.BeforeEach;
20+
import org.junit.jupiter.api.Test;
21+
22+
class SNOW_3011964_IT extends ConnectClusterBaseIT {
23+
24+
private static final String SCHEMA_SNOW_3011964 =
25+
loadClasspathResource("/com/snowflake/kafka/connector/avroschemas/schema3.json");
26+
public static final String TIMESTAMP_MILLIS = "TIMESTAMP_MILLIS";
27+
public static final String TIMESTAMP_LONG = "TIMESTAMP_LONG";
28+
29+
private KafkaProducer<String, Object> avroProducer;
30+
31+
@BeforeEach
32+
void beforeEach() {
33+
avroProducer = createAvroProducer();
34+
}
35+
36+
@AfterEach
37+
void afterEach() {
38+
if (avroProducer != null) {
39+
avroProducer.close();
40+
}
41+
}
42+
43+
@Test
44+
void test_SNOW_3011964_IT() throws Exception {
45+
// given
46+
snowflake.executeQueryWithParameters(
47+
String.format(
48+
"create table %s (%s TIMESTAMP(6), %s TIMESTAMP(6)) enable_schema_evolution = false",
49+
tableName, TIMESTAMP_MILLIS, TIMESTAMP_LONG));
50+
final Map<String, String> config = createConnectorConfig();
51+
config.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, AvroConverter.class.getName());
52+
config.put("value.converter.schema.registry.url", MOCK_SCHEMA_REGISTRY_URL);
53+
connectCluster.configureConnector(connectorName, config);
54+
waitForConnectorRunning(connectorName);
55+
56+
// when
57+
final Schema schema = new Schema.Parser().parse(SCHEMA_SNOW_3011964);
58+
final GenericRecord record = new GenericData.Record(schema);
59+
record.put(TIMESTAMP_MILLIS, 1768486316048L);
60+
record.put(TIMESTAMP_LONG, 1768486316048L);
61+
avroProducer.send(new ProducerRecord<>(topic0, "key1", record));
62+
avroProducer.flush();
63+
64+
// then
65+
assertWithRetry(() -> TestUtils.getNumberOfRows(tableName) == 1);
66+
Map<String, Object> firstRow = getTableContentOneRow(tableName);
67+
final Object avroLogicalTsAfterDbSave = firstRow.get(TIMESTAMP_MILLIS);
68+
final Object rawLongTsAfterDbSave = firstRow.get(TIMESTAMP_LONG);
69+
70+
// properly saved and fetched because avro logical type for this is timestamp-millis.
71+
// Internally before we send this value to snowpipe SDK for saving
72+
// we're converting it into string (we detect it by looking at the schema).
73+
// When snowflake sees the destination column to be TIMESTAMP and
74+
// sent value is Sting number it converts it to propert timestamp
75+
assertThat(avroLogicalTsAfterDbSave)
76+
.isEqualTo(new Timestamp(1768486316048L)); // "2026-01-15 14:11:56.048"
77+
// Not saved properly. The value fetched from the database is x1000 greater.
78+
// This is because avro type for that value is long. It is being saved to database as long
79+
// when snowflake saves numeric long value to the column of type TIMESTAMP
80+
// it treats it as seconds (not milliseconds) and multiplies by 1000
81+
assertThat(rawLongTsAfterDbSave)
82+
.isEqualTo(new Timestamp(1768486316048L * 1000)); // "58011-02-06 06:54:08.0"
83+
}
84+
}

src/test/java/com/snowflake/kafka/connector/SchemaEvolutionAvroSrIT.java

Lines changed: 9 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,21 @@
22

33
import static com.snowflake.kafka.connector.internal.TestUtils.assertTableColumnCount;
44
import static com.snowflake.kafka.connector.internal.TestUtils.assertWithRetry;
5+
import static com.snowflake.kafka.connector.internal.TestUtils.loadClasspathResource;
56

67
import com.snowflake.kafka.connector.internal.TestUtils;
78
import io.confluent.connect.avro.AvroConverter;
8-
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
9-
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
10-
import io.confluent.kafka.serializers.KafkaAvroSerializer;
119
import java.math.BigDecimal;
1210
import java.nio.ByteBuffer;
1311
import java.util.HashMap;
1412
import java.util.Map;
15-
import java.util.Properties;
1613
import org.apache.avro.Conversions;
1714
import org.apache.avro.LogicalTypes;
1815
import org.apache.avro.Schema;
1916
import org.apache.avro.generic.GenericData;
2017
import org.apache.avro.generic.GenericRecord;
2118
import org.apache.kafka.clients.producer.KafkaProducer;
22-
import org.apache.kafka.clients.producer.ProducerConfig;
2319
import org.apache.kafka.clients.producer.ProducerRecord;
24-
import org.apache.kafka.common.serialization.StringSerializer;
2520
import org.apache.kafka.connect.runtime.ConnectorConfig;
2621
import org.junit.jupiter.api.AfterEach;
2722
import org.junit.jupiter.api.BeforeEach;
@@ -32,9 +27,7 @@
3227
* updated with correct column types when records with different Avro schemas are sent from multiple
3328
* topics.
3429
*/
35-
class SchemaEvolutionAvroSrIT extends SchemaEvolutionBase {
36-
37-
private static final String MOCK_SCHEMA_REGISTRY_URL = "mock://test-schema-registry";
30+
class SchemaEvolutionAvroSrIT extends ConnectClusterBaseIT {
3831

3932
private static final String PERFORMANCE_STRING = "PERFORMANCE_STRING";
4033
private static final String PERFORMANCE_CHAR = "PERFORMANCE_CHAR";
@@ -66,29 +59,11 @@ class SchemaEvolutionAvroSrIT extends SchemaEvolutionBase {
6659
EXPECTED_SCHEMA.put(RECORD_METADATA, "VARIANT");
6760
}
6861

69-
private static final String VALUE_SCHEMA_0 =
70-
"{\"type\": \"record\",\"name\": \"value_schema_0\",\"fields\": [ {\"name\":"
71-
+ " \"PERFORMANCE_CHAR\", \"type\": \"string\"}, {\"name\": \"PERFORMANCE_STRING\","
72-
+ " \"type\": \"string\"},"
73-
+ " {\"name\":\"TIME_MILLIS\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},"
74-
+ "{\"name\":\"DATE\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},{\"name\":\"DECIMAL\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\","
75-
+ " \"precision\":4, \"scale\":2}},"
76-
+ "{\"name\":\"TIMESTAMP_MILLIS\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},"
77-
+ " {\"name\": \"RATING_INT\", \"type\": \"int\"}]}";
78-
79-
private static final String VALUE_SCHEMA_1 =
80-
"{"
81-
+ "\"type\": \"record\","
82-
+ "\"name\": \"value_schema_1\","
83-
+ "\"fields\": ["
84-
+ " {\"name\": \"RATING_DOUBLE\", \"type\": \"float\"},"
85-
+ " {\"name\": \"PERFORMANCE_STRING\", \"type\": \"string\"},"
86-
+ " {\"name\": \"APPROVAL\", \"type\": \"boolean\"},"
87-
+ " {\"name\": \"SOME_FLOAT_NAN\", \"type\": \"float\"}"
88-
+ "]"
89-
+ "}";
90-
91-
private static final String SCHEMA_REGISTRY_SCOPE = "test-schema-registry";
62+
private static final String SCHEMA_0 =
63+
loadClasspathResource("/com/snowflake/kafka/connector/avroschemas/schema1.json");
64+
private static final String SCHEMA_1 =
65+
loadClasspathResource("/com/snowflake/kafka/connector/avroschemas/schema2.json");
66+
9267
private static final int COL_NUM = 11;
9368

9469
private KafkaProducer<String, Object> avroProducer;
@@ -103,7 +78,6 @@ void afterEach() {
10378
if (avroProducer != null) {
10479
avroProducer.close();
10580
}
106-
MockSchemaRegistry.dropScope(SCHEMA_REGISTRY_SCOPE);
10781
}
10882

10983
@Test
@@ -127,25 +101,8 @@ void testSchemaEvolutionWithMultipleTopicsAndAvroSr() throws Exception {
127101
TestUtils.checkTableSchema(tableName, EXPECTED_SCHEMA);
128102
}
129103

130-
private KafkaProducer<String, Object> createAvroProducer() {
131-
final Properties props = new Properties();
132-
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, connectCluster.kafka().bootstrapServers());
133-
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
134-
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
135-
props.put("schema.registry.url", MOCK_SCHEMA_REGISTRY_URL);
136-
return new KafkaProducer<>(props, new StringSerializer(), createAvroSerializer());
137-
}
138-
139-
private KafkaAvroSerializer createAvroSerializer() {
140-
final SchemaRegistryClient schemaRegistryClient =
141-
MockSchemaRegistry.getClientForScope(SCHEMA_REGISTRY_SCOPE);
142-
final KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaRegistryClient);
143-
serializer.configure(Map.of("schema.registry.url", MOCK_SCHEMA_REGISTRY_URL), false);
144-
return serializer;
145-
}
146-
147104
private void sendRecordsToTopic0() {
148-
final Schema schema = new Schema.Parser().parse(VALUE_SCHEMA_0);
105+
final Schema schema = new Schema.Parser().parse(SCHEMA_0);
149106
for (int i = 0; i < RECORD_COUNT; i++) {
150107
final GenericRecord record = createTopic0Record(schema);
151108
avroProducer.send(new ProducerRecord<>(topic0, "key-" + i, record));
@@ -154,7 +111,7 @@ private void sendRecordsToTopic0() {
154111
}
155112

156113
private void sendRecordsToTopic1() {
157-
final Schema schema = new Schema.Parser().parse(VALUE_SCHEMA_1);
114+
final Schema schema = new Schema.Parser().parse(SCHEMA_1);
158115
for (int i = 0; i < RECORD_COUNT; i++) {
159116
final GenericRecord record = createTopic1Record(schema);
160117
avroProducer.send(new ProducerRecord<>(topic1, "key-" + i, record));

0 commit comments

Comments
 (0)