Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,31 +1,57 @@
package com.snowflake.kafka.connector;

import static com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams.SNOWFLAKE_TOPICS2TABLE_MAP;
import static org.awaitility.Awaitility.await;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.snowflake.kafka.connector.Constants.KafkaConnectorConfigParams;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory;
import com.snowflake.kafka.connector.internal.TestUtils;
import com.snowflake.kafka.connector.internal.streaming.FakeIngestClientSupplier;
import com.snowflake.kafka.connector.internal.streaming.FakeSnowflakeStreamingIngestClient;
import com.snowflake.kafka.connector.internal.streaming.v2.StreamingClientManager;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInstance;

/** Base class for integration tests using an embedded Kafka Connect cluster. */
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class ConnectClusterBaseIT {

protected EmbeddedConnectCluster connectCluster;
static final String MOCK_SCHEMA_REGISTRY_URL = "mock://test-schema-registry";
static final String SCHEMA_REGISTRY_SCOPE = "test-schema-registry";
static final int PARTITION_COUNT = 1;
static final int RECORD_COUNT = 100;
static final int TOPIC_COUNT = 2;
static final Integer TASK_NUMBER = 1;

protected final FakeIngestClientSupplier fakeClientSupplier = new FakeIngestClientSupplier();
protected final ObjectMapper objectMapper = new ObjectMapper();

static final Integer TASK_NUMBER = 1;
protected String tableName;
protected String connectorName;
protected String topic0;
protected String topic1;
protected EmbeddedConnectCluster connectCluster;
protected SnowflakeConnectionService snowflake;

@BeforeAll
public void beforeAll() {
Expand All @@ -51,6 +77,34 @@ public void afterAll() {
}
}

@BeforeEach
void before() {

tableName = TestUtils.randomTableName();
connectorName = String.format("%s_connector", tableName);
topic0 = tableName + "0";
topic1 = tableName + "1";
connectCluster.kafka().createTopic(topic0, PARTITION_COUNT);
connectCluster.kafka().createTopic(topic1, PARTITION_COUNT);
snowflake =
SnowflakeConnectionServiceFactory.builder()
.setProperties(TestUtils.transformProfileFileToConnectorConfiguration(false))
.noCaching()
.build();

StreamingClientManager.resetIngestClientSupplier();
}

@AfterEach
void after() {
connectCluster.kafka().deleteTopic(topic0);
connectCluster.kafka().deleteTopic(topic1);
connectCluster.deleteConnector(connectorName);
StreamingClientManager.resetIngestClientSupplier();
TestUtils.dropTable(tableName);
MockSchemaRegistry.dropScope(SCHEMA_REGISTRY_SCOPE);
}

protected FakeSnowflakeStreamingIngestClient getOpenedFakeIngestClient(String connectorName) {
await("channelsCreated")
.atMost(Duration.ofSeconds(60))
Expand Down Expand Up @@ -115,6 +169,44 @@ protected final void waitForConnectorStopped(String connectorName) {
}
}

protected KafkaProducer<String, Object> createAvroProducer() {
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, connectCluster.kafka().bootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", MOCK_SCHEMA_REGISTRY_URL);
return new KafkaProducer<>(props, new StringSerializer(), createAvroSerializer());
}

protected KafkaAvroSerializer createAvroSerializer() {
final SchemaRegistryClient schemaRegistryClient =
MockSchemaRegistry.getClientForScope(SCHEMA_REGISTRY_SCOPE);
final KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaRegistryClient);
serializer.configure(Map.of("schema.registry.url", MOCK_SCHEMA_REGISTRY_URL), false);
return serializer;
}

protected Map<String, String> createConnectorConfig() {
final String topics = topic0 + "," + topic1;
final String topicsToTableMap = topic0 + ":" + tableName + "," + topic1 + ":" + tableName;

final Map<String, String> config = defaultProperties(topics, connectorName);
config.put(SNOWFLAKE_TOPICS2TABLE_MAP, topicsToTableMap);
config.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
config.put("value.converter.schemas.enable", "false");
config.put("errors.tolerance", "none");
config.put("errors.log.enable", "true");
config.put("errors.deadletterqueue.topic.name", "DLQ_TOPIC");
config.put("errors.deadletterqueue.topic.replication.factor", "1");
config.put("jmx", "true");
return config;
}

void sendTombstoneRecords(final String topic) {
// Send null tombstone
connectCluster.kafka().produce(topic, null);
}

private FakeSnowflakeStreamingIngestClient getFakeSnowflakeStreamingIngestClient(
String connectorName) {
return fakeClientSupplier.getFakeIngestClients().stream()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.snowflake.kafka.connector;

import static com.snowflake.kafka.connector.internal.TestUtils.assertWithRetry;
import static com.snowflake.kafka.connector.internal.TestUtils.getTableContentOneRow;
import static com.snowflake.kafka.connector.internal.TestUtils.loadClasspathResource;
import static org.assertj.core.api.Assertions.assertThat;

import com.snowflake.kafka.connector.internal.TestUtils;
import io.confluent.connect.avro.AvroConverter;
import java.sql.Timestamp;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class SNOW_3011964_IT extends ConnectClusterBaseIT {

private static final String SCHEMA_SNOW_3011964 =
loadClasspathResource("/com/snowflake/kafka/connector/avroschemas/schema3.json");
public static final String TIMESTAMP_MILLIS = "TIMESTAMP_MILLIS";
public static final String TIMESTAMP_LONG = "TIMESTAMP_LONG";

private KafkaProducer<String, Object> avroProducer;

@BeforeEach
void beforeEach() {
avroProducer = createAvroProducer();
}

@AfterEach
void afterEach() {
if (avroProducer != null) {
avroProducer.close();
}
}

@Test
void test_SNOW_3011964_IT() throws Exception {
// given
snowflake.executeQueryWithParameters(
String.format(
"create table %s (%s TIMESTAMP(6), %s TIMESTAMP(6)) enable_schema_evolution = false",
tableName, TIMESTAMP_MILLIS, TIMESTAMP_LONG));
final Map<String, String> config = createConnectorConfig();
config.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, AvroConverter.class.getName());
config.put("value.converter.schema.registry.url", MOCK_SCHEMA_REGISTRY_URL);
connectCluster.configureConnector(connectorName, config);
waitForConnectorRunning(connectorName);

// when
final Schema schema = new Schema.Parser().parse(SCHEMA_SNOW_3011964);
final GenericRecord record = new GenericData.Record(schema);
record.put(TIMESTAMP_MILLIS, 1768486316048L);
record.put(TIMESTAMP_LONG, 1768486316048L);
avroProducer.send(new ProducerRecord<>(topic0, "key1", record));
avroProducer.flush();

// then
assertWithRetry(() -> TestUtils.getNumberOfRows(tableName) == 1);
Map<String, Object> firstRow = getTableContentOneRow(tableName);
final Object avroLogicalTsAfterDbSave = firstRow.get(TIMESTAMP_MILLIS);
final Object rawLongTsAfterDbSave = firstRow.get(TIMESTAMP_LONG);

// properly saved and fetched because avro logical type for this is timestamp-millis.
// Internally before we send this value to snowpipe SDK for saving
// we're converting it into string (we detect it by looking at the schema).
// When snowflake sees the destination column to be TIMESTAMP and
// sent value is Sting number it converts it to propert timestamp
assertThat(avroLogicalTsAfterDbSave)
.isEqualTo(new Timestamp(1768486316048L)); // "2026-01-15 14:11:56.048"
// Not saved properly. The value fetched from the database is x1000 greater.
// This is because avro type for that value is long. It is being saved to database as long
// when snowflake saves numeric long value to the column of type TIMESTAMP
// it treats it as seconds (not milliseconds) and multiplies by 1000
assertThat(rawLongTsAfterDbSave)
.isEqualTo(new Timestamp(1768486316048L * 1000)); // "58011-02-06 06:54:08.0"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,21 @@

import static com.snowflake.kafka.connector.internal.TestUtils.assertTableColumnCount;
import static com.snowflake.kafka.connector.internal.TestUtils.assertWithRetry;
import static com.snowflake.kafka.connector.internal.TestUtils.loadClasspathResource;

import com.snowflake.kafka.connector.internal.TestUtils;
import io.confluent.connect.avro.AvroConverter;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -32,9 +27,7 @@
* updated with correct column types when records with different Avro schemas are sent from multiple
* topics.
*/
class SchemaEvolutionAvroSrIT extends SchemaEvolutionBase {

private static final String MOCK_SCHEMA_REGISTRY_URL = "mock://test-schema-registry";
class SchemaEvolutionAvroSrIT extends ConnectClusterBaseIT {

private static final String PERFORMANCE_STRING = "PERFORMANCE_STRING";
private static final String PERFORMANCE_CHAR = "PERFORMANCE_CHAR";
Expand Down Expand Up @@ -66,29 +59,11 @@ class SchemaEvolutionAvroSrIT extends SchemaEvolutionBase {
EXPECTED_SCHEMA.put(RECORD_METADATA, "VARIANT");
}

private static final String VALUE_SCHEMA_0 =
"{\"type\": \"record\",\"name\": \"value_schema_0\",\"fields\": [ {\"name\":"
+ " \"PERFORMANCE_CHAR\", \"type\": \"string\"}, {\"name\": \"PERFORMANCE_STRING\","
+ " \"type\": \"string\"},"
+ " {\"name\":\"TIME_MILLIS\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},"
+ "{\"name\":\"DATE\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},{\"name\":\"DECIMAL\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\","
+ " \"precision\":4, \"scale\":2}},"
+ "{\"name\":\"TIMESTAMP_MILLIS\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},"
+ " {\"name\": \"RATING_INT\", \"type\": \"int\"}]}";

private static final String VALUE_SCHEMA_1 =
"{"
+ "\"type\": \"record\","
+ "\"name\": \"value_schema_1\","
+ "\"fields\": ["
+ " {\"name\": \"RATING_DOUBLE\", \"type\": \"float\"},"
+ " {\"name\": \"PERFORMANCE_STRING\", \"type\": \"string\"},"
+ " {\"name\": \"APPROVAL\", \"type\": \"boolean\"},"
+ " {\"name\": \"SOME_FLOAT_NAN\", \"type\": \"float\"}"
+ "]"
+ "}";

private static final String SCHEMA_REGISTRY_SCOPE = "test-schema-registry";
private static final String SCHEMA_0 =
loadClasspathResource("/com/snowflake/kafka/connector/avroschemas/schema1.json");
private static final String SCHEMA_1 =
loadClasspathResource("/com/snowflake/kafka/connector/avroschemas/schema2.json");

private static final int COL_NUM = 11;

private KafkaProducer<String, Object> avroProducer;
Expand All @@ -103,7 +78,6 @@ void afterEach() {
if (avroProducer != null) {
avroProducer.close();
}
MockSchemaRegistry.dropScope(SCHEMA_REGISTRY_SCOPE);
}

@Test
Expand All @@ -127,25 +101,8 @@ void testSchemaEvolutionWithMultipleTopicsAndAvroSr() throws Exception {
TestUtils.checkTableSchema(tableName, EXPECTED_SCHEMA);
}

private KafkaProducer<String, Object> createAvroProducer() {
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, connectCluster.kafka().bootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", MOCK_SCHEMA_REGISTRY_URL);
return new KafkaProducer<>(props, new StringSerializer(), createAvroSerializer());
}

private KafkaAvroSerializer createAvroSerializer() {
final SchemaRegistryClient schemaRegistryClient =
MockSchemaRegistry.getClientForScope(SCHEMA_REGISTRY_SCOPE);
final KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaRegistryClient);
serializer.configure(Map.of("schema.registry.url", MOCK_SCHEMA_REGISTRY_URL), false);
return serializer;
}

private void sendRecordsToTopic0() {
final Schema schema = new Schema.Parser().parse(VALUE_SCHEMA_0);
final Schema schema = new Schema.Parser().parse(SCHEMA_0);
for (int i = 0; i < RECORD_COUNT; i++) {
final GenericRecord record = createTopic0Record(schema);
avroProducer.send(new ProducerRecord<>(topic0, "key-" + i, record));
Expand All @@ -154,7 +111,7 @@ private void sendRecordsToTopic0() {
}

private void sendRecordsToTopic1() {
final Schema schema = new Schema.Parser().parse(VALUE_SCHEMA_1);
final Schema schema = new Schema.Parser().parse(SCHEMA_1);
for (int i = 0; i < RECORD_COUNT; i++) {
final GenericRecord record = createTopic1Record(schema);
avroProducer.send(new ProducerRecord<>(topic1, "key-" + i, record));
Expand Down
Loading
Loading