From 2e9c8e45862d875b37e4f58560b48b10e87b0e82 Mon Sep 17 00:00:00 2001 From: Toby Zhang Date: Wed, 5 Jul 2023 19:07:53 -0700 Subject: [PATCH 1/8] SNOW-850959: Fix a wrong result issue that offsets are skipped when schematization is enabled (#658) Looks like we have a gap in KC that may skip ingesting some offsets, consider this case where you have two topics with different schemas trying to ingest into the same table, internally KC will create two channels (channel A and channel B) with offset_token=NULL, then both channels start to buffer data and flush files, but channel A fails committing the first batch because the file schema doesn't match the latest table schema due to schema evolution, then channel A will be invalidated and reopened but we won't reset the consumer offset because the offset_token for channel A is still NULL, we say that we will rely on Kafka to send us the correct data when the offset_token is NULL, so in this case Kafka will continue send us the next batch and we will accept it, this means that the first batch for channel A will be skipped forever. We need to rethink about what we need to do when the offset_token for a channel is NULL and I don't think we can purely rely on Kafka to resend us the correct offset. The fix is to manage the Kafka consumer offset in the connector as well and use that to reset Kafka when the offset token for a channel is NULL instead of relying on Kafka to send us the correct offset --- .../streaming/SnowflakeSinkServiceV2.java | 4 +- .../streaming/TopicPartitionChannel.java | 96 ++++++----- .../streaming/SnowflakeSinkServiceV2IT.java | 51 +++++- ...hema_evolution_multi_topic_drop_table.json | 28 ++++ .../test_schema_evolution_drop_table.py | 3 +- ...schema_evolution_multi_topic_drop_table.py | 109 +++++++++++++ test/test_suites.py | 154 ++++++++++-------- 7 files changed, 324 insertions(+), 121 deletions(-) create mode 100644 test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json create mode 100644 test/test_suit/test_schema_evolution_multi_topic_drop_table.py diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java index e2e572246..09d4824f1 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java @@ -266,7 +266,9 @@ public long getOffset(TopicPartition topicPartition) { String partitionChannelKey = partitionChannelKey(topicPartition.topic(), topicPartition.partition()); if (partitionsToChannel.containsKey(partitionChannelKey)) { - return partitionsToChannel.get(partitionChannelKey).getOffsetSafeToCommitToKafka(); + long offset = partitionsToChannel.get(partitionChannelKey).getOffsetSafeToCommitToKafka(); + partitionsToChannel.get(partitionChannelKey).setLatestConsumerOffset(offset); + return offset; } else { LOGGER.warn( "Topic: {} Partition: {} hasn't been initialized to get offset", diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 0babd1edf..14d68e8c6 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -83,17 +83,24 @@ public class TopicPartitionChannel { // -------- private final fields -------- // - // This offset is updated when Snowflake has received offset from insertRows API - // We will update this value after calling offsetToken API for this channel + // This offset represents the data persisted in Snowflake. More specifically it is the Snowflake + // offset determined from the insertRows API call. It is set after calling the fetchOffsetToken + // API for this channel private final AtomicLong offsetPersistedInSnowflake = new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE); - // This offset is updated every time KC processed an offset, it's used to make sure that we won't - // process records that are already processed. It will be reset to the latest committed token - // every time we fetch it from Snowflake + // This offset represents the data buffered in KC. More specifically it is the KC offset to ensure + // exactly once functionality. On creation it is set to the latest committed token in Snowflake + // (see offsetPersistedInSnowflake) and updated on each new row from KC. private final AtomicLong processedOffset = new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE); + // This offset is a fallback to represent the data buffered in KC. It is similar to + // processedOffset, however it is only used to resend the offset when the channel offset token is + // NULL. It is updated to the first offset sent by KC (see processedOffset) or the offset + // persisted in Snowflake (see offsetPersistedInSnowflake) + private long latestConsumerOffset = NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE; + /** * Offsets are reset in kafka when one of following cases arises in which we rely on source of * truth (Which is Snowflake's committed offsetToken) @@ -263,6 +270,11 @@ public TopicPartitionChannel( this.processedOffset.set(lastCommittedOffsetToken); if (lastCommittedOffsetToken != NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { this.sinkTaskContext.offset(this.topicPartition, lastCommittedOffsetToken + 1L); + } else { + LOGGER.info( + "TopicPartitionChannel:{}, offset token is NULL, will rely on Kafka to send us the" + + " correct offset instead", + this.getChannelName()); } } @@ -281,6 +293,11 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) { final long currentOffsetPersistedInSnowflake = this.offsetPersistedInSnowflake.get(); final long currentProcessedOffset = this.processedOffset.get(); + // Set the consumer offset to be the first record that Kafka sends us + if (latestConsumerOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { + latestConsumerOffset = kafkaSinkRecord.kafkaOffset(); + } + // Ignore adding to the buffer until we see the expected offset value if (shouldIgnoreAddingRecordToBuffer(kafkaSinkRecord, currentProcessedOffset)) { return; @@ -345,7 +362,10 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) { */ private boolean shouldIgnoreAddingRecordToBuffer( SinkRecord kafkaSinkRecord, long currentProcessedOffset) { - if (!isOffsetResetInKafka) { + // Don't skip rows if there is no offset reset or there is no offset token information in the + // channel + if (!isOffsetResetInKafka + || currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { return false; } @@ -496,13 +516,8 @@ InsertRowsResponse insertBufferedRecords(StreamingBuffer streamingBufferToInsert // Due to schema evolution, we may need to reopen the channel and reset the offset in kafka // since it's possible that not all rows are ingested if (response.needToResetOffset()) { - final long offsetRecoveredFromSnowflake = - streamingApiFallbackSupplier( - StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK); - // If there is no valid offset token at server side even after the reset, retry it again - if (offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { - insertBufferedRecords(streamingBufferToInsert); - } + streamingApiFallbackSupplier( + StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK); } return response; } catch (TopicPartitionChannelInsertionException ex) { @@ -639,9 +654,6 @@ public InsertRowsResponse get() throws Throwable { } } } - // TODO SNOW-758492: for schematization, sleep a few seconds after the insert to avoid - // combining channels with different schemas - Thread.sleep(2000); } return new InsertRowsResponse(finalResponse, needToResetOffset); } @@ -877,20 +889,25 @@ private long streamingApiFallbackSupplier( private void resetChannelMetadataAfterRecovery( final StreamingApiFallbackInvoker streamingApiFallbackInvoker, final long offsetRecoveredFromSnowflake) { - // If we don't get a valid offset token from server side, reset the processed offset to invalid - // and rely on kafka to send us the correct data if (offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { - this.offsetPersistedInSnowflake.set(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE); - this.processedOffset.set(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE); - LOGGER.warn( - "{} Channel:{}, OffsetRecoveredFromSnowflake:{}, skip reset kafka offset", + LOGGER.info( + "{} Channel:{}, offset token is NULL, will use the consumer offset managed by the" + + " connector instead, consumer offset:{}", streamingApiFallbackInvoker, this.getChannelName(), - offsetRecoveredFromSnowflake); + latestConsumerOffset); + } + + // If the offset token in the channel is null, use the consumer offset managed by the connector; + // otherwise use the offset token stored in the channel + final long offsetToResetInKafka = + offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE + ? latestConsumerOffset + : offsetRecoveredFromSnowflake + 1L; + if (offsetToResetInKafka == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { return; } - final long offsetToResetInKafka = offsetRecoveredFromSnowflake + 1L; // reset the buffer this.bufferLock.lock(); try { @@ -916,7 +933,7 @@ private void resetChannelMetadataAfterRecovery( this.bufferLock.unlock(); } LOGGER.warn( - "{} Channel:{}, OffsetRecoveredFromSnowflake:{}, Reset kafka offset to:{}", + "{} Channel:{}, OffsetRecoveredFromSnowflake:{}, reset kafka offset to:{}", streamingApiFallbackInvoker, this.getChannelName(), offsetRecoveredFromSnowflake, @@ -954,26 +971,15 @@ private long getRecoveredOffsetFromSnowflake( * snowflake. */ private long fetchLatestCommittedOffsetFromSnowflake() { - LOGGER.debug( - "Fetching last committed offset for partition channel:{}", - this.channel.getFullyQualifiedName()); + LOGGER.debug("Fetching last committed offset for partition channel:{}", this.getChannelName()); String offsetToken = null; try { offsetToken = this.channel.getLatestCommittedOffsetToken(); - if (offsetToken == null) { - LOGGER.warn( - "OffsetToken not present for channelName:{}, will rely on kafka consumer offset as" - + " source of truth", - this.getChannelName()); - return NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE; - } else { - long latestCommittedOffsetInSnowflake = Long.parseLong(offsetToken); - LOGGER.info( - "Fetched offsetToken:{} for channelName:{}", - latestCommittedOffsetInSnowflake, - this.channel.getFullyQualifiedName()); - return latestCommittedOffsetInSnowflake; - } + LOGGER.info( + "Fetched offsetToken for channelName:{}, offset:{}", this.getChannelName(), offsetToken); + return offsetToken == null + ? NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE + : Long.parseLong(offsetToken); } catch (NumberFormatException ex) { LOGGER.error( "The offsetToken string does not contain a parsable long:{} for channel:{}", @@ -1075,6 +1081,12 @@ protected SnowflakeTelemetryService getTelemetryServiceV2() { return this.telemetryServiceV2; } + protected void setLatestConsumerOffset(long consumerOffset) { + if (consumerOffset > this.latestConsumerOffset) { + this.latestConsumerOffset = consumerOffset; + } + } + /** * Converts the original kafka sink record into a Json Record. i.e key and values are converted * into Json so that it can be used to insert into variant column of Snowflake Table. diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java index e7ca59baa..b7e21d90b 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java @@ -949,7 +949,6 @@ public void testSchematizationWithTableCreationAndAvroInput() throws Exception { SchemaAndValue avroInputValue = avroConverter.toConnectData(topic, converted); long startOffset = 0; - long endOffset = 0; SinkRecord avroRecordValue = new SinkRecord( @@ -969,12 +968,22 @@ public void testSchematizationWithTableCreationAndAvroInput() throws Exception { .addTask(table, new TopicPartition(topic, partition)) .build(); - // Schema evolution should kick in to update the schema and then the rows should be ingested + // The first insert should fail and schema evolution will kick in to update the schema service.insert(avroRecordValue); TestUtils.assertWithRetry( - () -> service.getOffset(new TopicPartition(topic, partition)) == endOffset + 1, 20, 5); + () -> + service.getOffset(new TopicPartition(topic, partition)) + == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE, + 20, + 5); TestUtils.checkTableSchema(table, SchematizationTestUtils.SF_AVRO_SCHEMA_FOR_TABLE_CREATION); + + // Retry the insert should succeed now with the updated schema + service.insert(avroRecordValue); + TestUtils.assertWithRetry( + () -> service.getOffset(new TopicPartition(topic, partition)) == startOffset + 1, 20, 5); + TestUtils.checkTableContentOneRow( table, SchematizationTestUtils.CONTENT_FOR_AVRO_TABLE_CREATION); @@ -1029,7 +1038,6 @@ public void testSchematizationWithTableCreationAndJsonInput() throws Exception { SchemaAndValue jsonInputValue = jsonConverter.toConnectData(topic, converted); long startOffset = 0; - long endOffset = 0; SinkRecord jsonRecordValue = new SinkRecord( @@ -1049,11 +1057,21 @@ public void testSchematizationWithTableCreationAndJsonInput() throws Exception { .addTask(table, new TopicPartition(topic, partition)) .build(); - // Schema evolution should kick in to update the schema and then the rows should be ingested + // The first insert should fail and schema evolution will kick in to update the schema service.insert(jsonRecordValue); TestUtils.assertWithRetry( - () -> service.getOffset(new TopicPartition(topic, partition)) == startOffset + 1, 20, 5); + () -> + service.getOffset(new TopicPartition(topic, partition)) + == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE, + 20, + 5); TestUtils.checkTableSchema(table, SchematizationTestUtils.SF_JSON_SCHEMA_FOR_TABLE_CREATION); + + // Retry the insert should succeed now with the updated schema + service.insert(jsonRecordValue); + TestUtils.assertWithRetry( + () -> service.getOffset(new TopicPartition(topic, partition)) == startOffset + 1, 20, 5); + TestUtils.checkTableContentOneRow( table, SchematizationTestUtils.CONTENT_FOR_JSON_TABLE_CREATION); @@ -1103,7 +1121,26 @@ public void testSchematizationSchemaEvolutionWithNonNullableColumn() throws Exce .addTask(table, new TopicPartition(topic, partition)) .build(); - // Schema evolution should kick in to update the schema and then the rows should be ingested + // The first insert should fail and schema evolution will kick in to add the column + service.insert(jsonRecordValue); + TestUtils.assertWithRetry( + () -> + service.getOffset(new TopicPartition(topic, partition)) + == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE, + 20, + 5); + + // The second insert should fail again and schema evolution will kick in to update the + // nullability + service.insert(jsonRecordValue); + TestUtils.assertWithRetry( + () -> + service.getOffset(new TopicPartition(topic, partition)) + == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE, + 20, + 5); + + // Retry the insert should succeed now with the updated schema service.insert(jsonRecordValue); TestUtils.assertWithRetry( () -> service.getOffset(new TopicPartition(topic, partition)) == startOffset + 1, 20, 5); diff --git a/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json b/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json new file mode 100644 index 000000000..38e50ab1b --- /dev/null +++ b/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json @@ -0,0 +1,28 @@ +{ + "name": "SNOWFLAKE_CONNECTOR_NAME", + "config": { + "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", + "topics": "SNOWFLAKE_TEST_TOPIC0,SNOWFLAKE_TEST_TOPIC1", + "snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME,SNOWFLAKE_TEST_TOPIC1:SNOWFLAKE_CONNECTOR_NAME", + "tasks.max": "1", + "buffer.flush.time": "10", + "buffer.count.records": "100", + "buffer.size.bytes": "5000000", + "snowflake.url.name": "SNOWFLAKE_HOST", + "snowflake.user.name": "SNOWFLAKE_USER", + "snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY", + "snowflake.database.name": "SNOWFLAKE_DATABASE", + "snowflake.schema.name": "SNOWFLAKE_SCHEMA", + "snowflake.role.name": "SNOWFLAKE_ROLE", + "snowflake.ingestion.method": "SNOWPIPE_STREAMING", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "jmx": "true", + "errors.tolerance": "all", + "errors.log.enable": true, + "errors.deadletterqueue.topic.name": "DLQ_TOPIC", + "errors.deadletterqueue.topic.replication.factor": 1, + "snowflake.enable.schematization": true + } +} \ No newline at end of file diff --git a/test/test_suit/test_schema_evolution_drop_table.py b/test/test_suit/test_schema_evolution_drop_table.py index 8b303176c..06c2cf49f 100644 --- a/test/test_suit/test_schema_evolution_drop_table.py +++ b/test/test_suit/test_schema_evolution_drop_table.py @@ -46,7 +46,7 @@ def send(self): self.driver.sendBytesData(self.topic, value, key) # Sleep for some time and then verify the rows are ingested - sleep(60) + sleep(120) self.verify("0") # Recreate the table @@ -56,7 +56,6 @@ def send(self): "alter table {} set ENABLE_SCHEMA_EVOLUTION = true".format(self.table)) # Ingest another set of rows - sleep(30) self.driver.sendBytesData(self.topic, value, key) def verify(self, round): diff --git a/test/test_suit/test_schema_evolution_multi_topic_drop_table.py b/test/test_suit/test_schema_evolution_multi_topic_drop_table.py new file mode 100644 index 000000000..b72a24408 --- /dev/null +++ b/test/test_suit/test_schema_evolution_multi_topic_drop_table.py @@ -0,0 +1,109 @@ +import json +from time import sleep + +from test_suit.test_utils import NonRetryableError + + +# test if the table is updated with the correct column, and if the table is +# recreated and updated after it's being dropped +class TestSchemaEvolutionMultiTopicDropTable: + def __init__(self, driver, nameSalt): + self.driver = driver + self.fileName = "travis_correct_schema_evolution_multi_topic_drop_table" + self.topics = [] + self.table = self.fileName + nameSalt + self.recordNum = 100 + + for i in range(2): + self.topics.append(self.table + str(i)) + + self.driver.snowflake_conn.cursor().execute( + "Create or replace table {} (PERFORMANCE_STRING STRING)".format(self.table)) + + self.driver.snowflake_conn.cursor().execute( + "alter table {} set ENABLE_SCHEMA_EVOLUTION = true".format(self.table)) + + self.records = [] + + self.records.append({ + 'PERFORMANCE_STRING': 'Excellent', + '"case_sensitive_PERFORMANCE_CHAR"': 'A', + 'RATING_INT': 100 + }) + + self.records.append({ + 'PERFORMANCE_STRING': 'Excellent', + 'RATING_DOUBLE': 0.99, + 'APPROVAL': True + }) + + self.gold_type = { + 'PERFORMANCE_STRING': 'VARCHAR', + 'case_sensitive_PERFORMANCE_CHAR': 'VARCHAR', + 'RATING_INT': 'NUMBER', + 'RATING_DOUBLE': 'FLOAT', + 'APPROVAL': 'BOOLEAN', + 'RECORD_METADATA': 'VARIANT' + } + + self.gold_columns = [columnName for columnName in self.gold_type] + + def getConfigFileName(self): + return self.fileName + ".json" + + def send(self): + for i, topic in enumerate(self.topics): + key = [] + value = [] + for e in range(self.recordNum): + key.append(json.dumps({'number': str(e)}).encode('utf-8')) + value.append(json.dumps(self.records[i]).encode('utf-8')) + self.driver.sendBytesData(topic, value, key) + + # Sleep for some time and then verify the rows are ingested + sleep(120) + self.verify("0") + + # Recreate the table + self.driver.snowflake_conn.cursor().execute( + "Create or replace table {} (PERFORMANCE_STRING STRING, RECORD_METADATA VARIANT)".format(self.table)) + self.driver.snowflake_conn.cursor().execute( + "alter table {} set ENABLE_SCHEMA_EVOLUTION = true".format(self.table)) + + # Ingest another set of rows + for i, topic in enumerate(self.topics): + key = [] + value = [] + for e in range(self.recordNum): + key.append(json.dumps({'number': str(e)}).encode('utf-8')) + value.append(json.dumps(self.records[i]).encode('utf-8')) + self.driver.sendBytesData(topic, value, key) + + def verify(self, round): + rows = self.driver.snowflake_conn.cursor().execute( + "desc table {}".format(self.table)).fetchall() + res_col = {} + + gold_columns_copy = self.gold_columns.copy() + + for index, row in enumerate(rows): + gold_columns_copy.remove(row[0]) + if not row[1].startswith(self.gold_type[row[0]]): + raise NonRetryableError("Column {} has the wrong type. got: {}, expected: {}".format(row[0], row[1], + self.gold_type[ + row[0]])) + res_col[row[0]] = index + + print("Columns not in table: ", gold_columns_copy) + + for columnName in gold_columns_copy: + raise NonRetryableError("Column {} was not created".format(columnName)) + + res = self.driver.snowflake_conn.cursor().execute( + "SELECT count(*) FROM {}".format(self.table)).fetchone()[0] + if res != self.recordNum * len(self.topics): + print("Number of record expected: {}, got: {}".format(self.recordNum * len(self.topics), res)) + raise NonRetryableError("Number of record in table is different from number of record sent") + + def clean(self): + self.driver.cleanTableStagePipe(self.table) diff --git a/test/test_suites.py b/test/test_suites.py index d1dcba794..4ce1a26c5 100644 --- a/test/test_suites.py +++ b/test/test_suites.py @@ -1,48 +1,5 @@ from collections import OrderedDict -from test_suit.test_string_json import TestStringJson -from test_suit.test_json_json import TestJsonJson -from test_suit.test_string_avro import TestStringAvro -from test_suit.test_avro_avro import TestAvroAvro -from test_suit.test_string_avrosr import TestStringAvrosr -from test_suit.test_avrosr_avrosr import TestAvrosrAvrosr - -from test_suit.test_native_string_avrosr import TestNativeStringAvrosr -from test_suit.test_native_string_json_without_schema import TestNativeStringJsonWithoutSchema -from test_suit.test_native_complex_smt import TestNativeComplexSmt - -from test_suit.test_native_string_protobuf import TestNativeStringProtobuf -from test_suit.test_confluent_protobuf_protobuf import TestConfluentProtobufProtobuf - -from test_suit.test_snowpipe_streaming_string_json import TestSnowpipeStreamingStringJson -from test_suit.test_snowpipe_streaming_string_json_dlq import TestSnowpipeStreamingStringJsonDLQ -from test_suit.test_snowpipe_streaming_string_avro_sr import TestSnowpipeStreamingStringAvroSR - -from test_suit.test_multiple_topic_to_one_table_snowpipe_streaming import \ - TestMultipleTopicToOneTableSnowpipeStreaming -from test_suit.test_multiple_topic_to_one_table_snowpipe import TestMultipleTopicToOneTableSnowpipe - -from test_suit.test_schema_mapping import TestSchemaMapping - -from test_suit.test_auto_table_creation import TestAutoTableCreation -from test_suit.test_auto_table_creation_topic2table import TestAutoTableCreationTopic2Table - -from test_suit.test_schema_evolution_json import TestSchemaEvolutionJson -from test_suit.test_schema_evolution_avro_sr import TestSchemaEvolutionAvroSR - -from test_suit.test_schema_evolution_w_auto_table_creation_json import \ - TestSchemaEvolutionWithAutoTableCreationJson -from test_suit.test_schema_evolution_w_auto_table_creation_avro_sr import \ - TestSchemaEvolutionWithAutoTableCreationAvroSR - -from test_suit.test_schema_evolution_nonnullable_json import TestSchemaEvolutionNonNullableJson - -from test_suit.test_schema_not_supported_converter import TestSchemaNotSupportedConverter - -from test_suit.test_schema_evolution_drop_table import TestSchemaEvolutionDropTable - -from test_suit.test_snowpipe_streaming_schema_mapping_dlq import TestSnowpipeStreamingSchemaMappingDLQ -# res tests from test_suit.resilience_tests.test_kc_delete_create import TestKcDeleteCreate from test_suit.resilience_tests.test_kc_delete_create_chaos import TestKcDeleteCreateChaos from test_suit.resilience_tests.test_kc_delete_resume import TestKcDeleteResume @@ -54,6 +11,38 @@ from test_suit.resilience_tests.test_kc_recreate import TestKcRecreate from test_suit.resilience_tests.test_kc_recreate_chaos import TestKcRecreateChaos from test_suit.resilience_tests.test_kc_restart import TestKcRestart +from test_suit.test_auto_table_creation import TestAutoTableCreation +from test_suit.test_auto_table_creation_topic2table import TestAutoTableCreationTopic2Table +from test_suit.test_avro_avro import TestAvroAvro +from test_suit.test_avrosr_avrosr import TestAvrosrAvrosr +from test_suit.test_confluent_protobuf_protobuf import TestConfluentProtobufProtobuf +from test_suit.test_json_json import TestJsonJson +from test_suit.test_multiple_topic_to_one_table_snowpipe import TestMultipleTopicToOneTableSnowpipe +from test_suit.test_multiple_topic_to_one_table_snowpipe_streaming import \ + TestMultipleTopicToOneTableSnowpipeStreaming +from test_suit.test_native_complex_smt import TestNativeComplexSmt +from test_suit.test_native_string_avrosr import TestNativeStringAvrosr +from test_suit.test_native_string_json_without_schema import TestNativeStringJsonWithoutSchema +from test_suit.test_native_string_protobuf import TestNativeStringProtobuf +from test_suit.test_schema_evolution_avro_sr import TestSchemaEvolutionAvroSR +from test_suit.test_schema_evolution_drop_table import TestSchemaEvolutionDropTable +from test_suit.test_schema_evolution_json import TestSchemaEvolutionJson +from test_suit.test_schema_evolution_multi_topic_drop_table import TestSchemaEvolutionMultiTopicDropTable +from test_suit.test_schema_evolution_nonnullable_json import TestSchemaEvolutionNonNullableJson +from test_suit.test_schema_evolution_w_auto_table_creation_avro_sr import \ + TestSchemaEvolutionWithAutoTableCreationAvroSR +from test_suit.test_schema_evolution_w_auto_table_creation_json import \ + TestSchemaEvolutionWithAutoTableCreationJson +from test_suit.test_schema_mapping import TestSchemaMapping +from test_suit.test_schema_not_supported_converter import TestSchemaNotSupportedConverter +from test_suit.test_snowpipe_streaming_schema_mapping_dlq import TestSnowpipeStreamingSchemaMappingDLQ +from test_suit.test_snowpipe_streaming_string_avro_sr import TestSnowpipeStreamingStringAvroSR +from test_suit.test_snowpipe_streaming_string_json import TestSnowpipeStreamingStringJson +from test_suit.test_snowpipe_streaming_string_json_dlq import TestSnowpipeStreamingStringJsonDLQ +from test_suit.test_string_avro import TestStringAvro +from test_suit.test_string_avrosr import TestStringAvrosr +from test_suit.test_string_json import TestStringJson + class EndToEndTestSuite: ''' @@ -61,6 +50,7 @@ class EndToEndTestSuite: Just modify the caller constructor of this class to disable, enable in confluent or Apache Kafka. In future can add whether it runs in snowpipe or snowpipe streaming mode. ''' + def __init__(self, test_instance, clean, run_in_confluent, run_in_apache): self._test_instance = test_instance self._clean = clean @@ -83,6 +73,7 @@ def run_in_confluent(self): def run_in_apache(self): return self._run_in_apache + def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testSet): ''' Creates all End to End tests which needs to run against Confluent Kafka or Apache Kafka. @@ -112,91 +103,114 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS test_instance=TestAvrosrAvrosr(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=False )), ("TestNativeStringAvrosr", EndToEndTestSuite( - test_instance=TestNativeStringAvrosr(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=False + test_instance=TestNativeStringAvrosr(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=False )), ("TestNativeStringJsonWithoutSchema", EndToEndTestSuite( - test_instance=TestNativeStringJsonWithoutSchema(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True + test_instance=TestNativeStringJsonWithoutSchema(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=True )), ("TestNativeComplexSmt", EndToEndTestSuite( test_instance=TestNativeComplexSmt(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), ("TestNativeStringProtobuf", EndToEndTestSuite( - test_instance=TestNativeStringProtobuf(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True + test_instance=TestNativeStringProtobuf(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=True )), ("TestConfluentProtobufProtobuf", EndToEndTestSuite( - test_instance=TestConfluentProtobufProtobuf(driver, nameSalt), clean=True, run_in_confluent=False, run_in_apache=False + test_instance=TestConfluentProtobufProtobuf(driver, nameSalt), clean=True, run_in_confluent=False, + run_in_apache=False )), ("TestSnowpipeStreamingStringJson", EndToEndTestSuite( - test_instance=TestSnowpipeStreamingStringJson(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True + test_instance=TestSnowpipeStreamingStringJson(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=True )), ("TestSnowpipeStreamingStringJsonDLQ", EndToEndTestSuite( - test_instance=TestSnowpipeStreamingStringJsonDLQ(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True + test_instance=TestSnowpipeStreamingStringJsonDLQ(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=True )), ("TestSnowpipeStreamingStringAvro", EndToEndTestSuite( - test_instance=TestSnowpipeStreamingStringAvroSR(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=False + test_instance=TestSnowpipeStreamingStringAvroSR(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=False )), ("TestMultipleTopicToOneTableSnowpipeStreaming", EndToEndTestSuite( - test_instance=TestMultipleTopicToOneTableSnowpipeStreaming(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True + test_instance=TestMultipleTopicToOneTableSnowpipeStreaming(driver, nameSalt), clean=True, + run_in_confluent=True, run_in_apache=True )), ("TestMultipleTopicToOneTableSnowpipe", EndToEndTestSuite( - test_instance=TestMultipleTopicToOneTableSnowpipe(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True + test_instance=TestMultipleTopicToOneTableSnowpipe(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=True )), ("TestSchemaMapping", EndToEndTestSuite( - test_instance=TestSchemaMapping(driver, nameSalt), clean=True, run_in_confluent=True,run_in_apache=True + test_instance=TestSchemaMapping(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), ("TestSnowpipeStreamingSchemaMappingDLQ", EndToEndTestSuite( - test_instance=TestSnowpipeStreamingSchemaMappingDLQ(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True + test_instance=TestSnowpipeStreamingSchemaMappingDLQ(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=True )), ("TestAutoTableCreation", EndToEndTestSuite( - test_instance=TestAutoTableCreation(driver, nameSalt, schemaRegistryAddress, testSet), clean=True, run_in_confluent=True, run_in_apache=False + test_instance=TestAutoTableCreation(driver, nameSalt, schemaRegistryAddress, testSet), clean=True, + run_in_confluent=True, run_in_apache=False )), ("TestAutoTableCreationTopic2Table", EndToEndTestSuite( - test_instance=TestAutoTableCreationTopic2Table(driver, nameSalt, schemaRegistryAddress, testSet), clean=True, run_in_confluent=True, run_in_apache=False + test_instance=TestAutoTableCreationTopic2Table(driver, nameSalt, schemaRegistryAddress, testSet), + clean=True, run_in_confluent=True, run_in_apache=False )), ("TestSchemaEvolutionJson", EndToEndTestSuite( - test_instance=TestSchemaEvolutionJson(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True + test_instance=TestSchemaEvolutionJson(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=True )), ("TestSchemaEvolutionAvroSR", EndToEndTestSuite( - test_instance=TestSchemaEvolutionAvroSR(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=False + test_instance=TestSchemaEvolutionAvroSR(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=False )), ("TestSchemaEvolutionWithAutoTableCreationJson", EndToEndTestSuite( - test_instance=TestSchemaEvolutionWithAutoTableCreationJson(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True + test_instance=TestSchemaEvolutionWithAutoTableCreationJson(driver, nameSalt), clean=True, + run_in_confluent=True, run_in_apache=True )), ("TestSchemaEvolutionWithAutoTableCreationAvroSR", EndToEndTestSuite( - test_instance=TestSchemaEvolutionWithAutoTableCreationAvroSR(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=False + test_instance=TestSchemaEvolutionWithAutoTableCreationAvroSR(driver, nameSalt), clean=True, + run_in_confluent=True, run_in_apache=False )), ("TestSchemaEvolutionNonNullableJson", EndToEndTestSuite( - test_instance=TestSchemaEvolutionNonNullableJson(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True + test_instance=TestSchemaEvolutionNonNullableJson(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=True )), ("TestSchemaNotSupportedConverter", EndToEndTestSuite( - test_instance=TestSchemaNotSupportedConverter(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True + test_instance=TestSchemaNotSupportedConverter(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=True )), ("TestSchemaEvolutionDropTable", EndToEndTestSuite( - test_instance=TestSchemaEvolutionDropTable(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True + test_instance=TestSchemaEvolutionDropTable(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=True )), ("TestKcDeleteCreate", EndToEndTestSuite( test_instance=TestKcDeleteCreate(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), ("TestKcDeleteCreateChaos", EndToEndTestSuite( - test_instance=TestKcDeleteCreateChaos(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True + test_instance=TestKcDeleteCreateChaos(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=True )), ("TestKcDeleteResume", EndToEndTestSuite( test_instance=TestKcDeleteResume(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), ("TestKcDeleteResumeChaos", EndToEndTestSuite( - test_instance=TestKcDeleteResumeChaos(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True + test_instance=TestKcDeleteResumeChaos(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=True )), ("TestKcPauseCreate", EndToEndTestSuite( test_instance=TestKcPauseCreate(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), ("TestKcPauseCreateChaos", EndToEndTestSuite( - test_instance=TestKcPauseCreateChaos(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True + test_instance=TestKcPauseCreateChaos(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=True )), ("TestKcPauseResume", EndToEndTestSuite( test_instance=TestKcPauseResume(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), ("TestKcPauseResumeChaos", EndToEndTestSuite( - test_instance=TestKcPauseResumeChaos(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True + test_instance=TestKcPauseResumeChaos(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=True )), ("TestKcRecreate", EndToEndTestSuite( test_instance=TestKcRecreate(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True @@ -207,7 +221,9 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS ("TestKcRestart", EndToEndTestSuite( test_instance=TestKcRestart(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), + ("TestSchemaEvolutionMultiTopicDropTable", EndToEndTestSuite( + test_instance=TestSchemaEvolutionMultiTopicDropTable(driver, nameSalt), clean=True, run_in_confluent=True, + run_in_apache=True + )), ]) return test_suites - - From cf74b3966632632f226d198ed47b48839e27eef5 Mon Sep 17 00:00:00 2001 From: revi cheng Date: Thu, 6 Jul 2023 17:40:14 -0700 Subject: [PATCH 2/8] [SNOW-857447] Enable one client optimization by default (#665) --- .../snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index f9abfc9fa..4adbc121f 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -159,7 +159,7 @@ public class SnowflakeSinkConnectorConfig { "enable.streaming.client.optimization"; public static final String ENABLE_STREAMING_CLIENT_OPTIMIZATION_DISPLAY = "Enable streaming client optimization"; - public static final boolean ENABLE_STREAMING_CLIENT_OPTIMIZATION_DEFAULT = false; + public static final boolean ENABLE_STREAMING_CLIENT_OPTIMIZATION_DEFAULT = true; public static final String ENABLE_STREAMING_CLIENT_OPTIMIZATION_DOC = "Whether to optimize the streaming client to reduce cost. Note that this may affect" + " throughput or latency and can only be set if Streaming Snowpipe is enabled"; From 0f570ee4108b25f7899e5858f5afedffeb843e9c Mon Sep 17 00:00:00 2001 From: revi cheng Date: Wed, 12 Jul 2023 13:04:51 -0700 Subject: [PATCH 3/8] Release version 1.9.4 (#666) --- pom.xml | 2 +- pom_confluent.xml | 2 +- src/main/java/com/snowflake/kafka/connector/Utils.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index f2fa58e9f..59ac26d6d 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ com.snowflake snowflake-kafka-connector - 1.9.3 + 1.9.4 jar Snowflake Kafka Connector Snowflake Kafka Connect Sink Connector diff --git a/pom_confluent.xml b/pom_confluent.xml index a4cdeeb10..599402997 100644 --- a/pom_confluent.xml +++ b/pom_confluent.xml @@ -12,7 +12,7 @@ com.snowflake snowflake-kafka-connector - 1.9.3 + 1.9.4 jar Snowflake Kafka Connector Snowflake Kafka Connect Sink Connector diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java index 8fcd22d12..9b24606b7 100644 --- a/src/main/java/com/snowflake/kafka/connector/Utils.java +++ b/src/main/java/com/snowflake/kafka/connector/Utils.java @@ -50,7 +50,7 @@ public class Utils { // Connector version, change every release - public static final String VERSION = "1.9.3"; + public static final String VERSION = "1.9.4"; // connector parameter list public static final String NAME = "name"; From 58b92470e03eab2d8ba7465a3e5f5b29b5b84f00 Mon Sep 17 00:00:00 2001 From: revi cheng Date: Wed, 26 Jul 2023 13:30:11 -0700 Subject: [PATCH 4/8] [Streaming JMX #1] Small changes to Snowpipe Telemetry and JMX (#672) --- .../SnowflakeTelemetryBasicInfo.java | 12 +-------- .../SnowflakeTelemetryPipeCreation.java | 4 ++- .../SnowflakeTelemetryPipeStatus.java | 25 ++++++++++++++++--- .../telemetry/TelemetryConstants.java | 16 ++++++------ 4 files changed, 33 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java index abb940725..352615a34 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java @@ -9,27 +9,17 @@ public abstract class SnowflakeTelemetryBasicInfo { final String tableName; - /** - * User created or user visible stage name. - * - *

It is null for Snowpipe Streaming Telemetry. - */ - final String stageName; - static final KCLogger LOGGER = new KCLogger(SnowflakeTelemetryBasicInfo.class.getName()); /** * Base Constructor. Accepts a tableName and StageName. * * @param tableName Checks for Nullability - * @param stageName Can be null (In case of Snowpipe Streaming since there is no user visible - * Snowflake Stage) */ - public SnowflakeTelemetryBasicInfo(final String tableName, final String stageName) { + public SnowflakeTelemetryBasicInfo(final String tableName) { Preconditions.checkArgument( !Strings.isNullOrEmpty(tableName), "tableName cannot be null or empty"); this.tableName = tableName; - this.stageName = stageName; } /** diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java index 2f6cbd090..627f03cf0 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java @@ -24,11 +24,13 @@ public class SnowflakeTelemetryPipeCreation extends SnowflakeTelemetryBasicInfo int fileCountReprocessPurge = 0; // files on stage that are purged due to reprocessing when cleaner starts long startTime; // start time of the pipe + private final String stageName; private final String pipeName; public SnowflakeTelemetryPipeCreation( final String tableName, final String stageName, final String pipeName) { - super(tableName, stageName); + super(tableName); + this.stageName = stageName; this.pipeName = pipeName; this.startTime = System.currentTimeMillis(); } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java index 82c748d3c..3efd397b5 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java @@ -1,6 +1,10 @@ package com.snowflake.kafka.connector.internal.telemetry; -import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.*; +import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.FILE_COUNT_SUB_DOMAIN; +import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.FILE_COUNT_TABLE_STAGE_INGESTION_FAIL; +import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.LATENCY_SUB_DOMAIN; +import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.OFFSET_SUB_DOMAIN; +import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.constructMetricName; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.AVERAGE_COMMIT_LAG_FILE_COUNT; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.AVERAGE_COMMIT_LAG_MS; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.AVERAGE_INGESTION_LAG_FILE_COUNT; @@ -9,23 +13,34 @@ import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.AVERAGE_KAFKA_LAG_RECORD_COUNT; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.BYTE_NUMBER; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.CLEANER_RESTART_COUNT; +import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.COMMITTED_OFFSET; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.END_TIME; +import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_ON_INGESTION; +import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_ON_STAGE; +import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_PURGED; +import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_TABLE_STAGE_BROKEN_RECORD; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_TABLE_STAGE_INGEST_FAIL; +import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FLUSHED_OFFSET; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.MEMORY_USAGE; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PIPE_NAME; +import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PROCESSED_OFFSET; +import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PURGED_OFFSET; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.RECORD_NUMBER; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.STAGE_NAME; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.START_TIME; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.TABLE_NAME; -import com.codahale.metrics.*; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter; import com.snowflake.kafka.connector.internal.metrics.MetricsUtil; import com.snowflake.kafka.connector.internal.metrics.MetricsUtil.EventType; -import java.util.*; +import java.util.Arrays; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -108,6 +123,7 @@ public class SnowflakeTelemetryPipeStatus extends SnowflakeTelemetryBasicInfo { // May not be set if jmx is set to false private Meter fileCountTableStageBrokenRecordMeter, fileCountTableStageIngestFailMeter; + private final String stageName; private final String pipeName; public SnowflakeTelemetryPipeStatus( @@ -116,7 +132,8 @@ public SnowflakeTelemetryPipeStatus( final String pipeName, final boolean enableCustomJMXConfig, final MetricsJmxReporter metricsJmxReporter) { - super(tableName, stageName); + super(tableName); + this.stageName = stageName; this.pipeName = pipeName; // Initial value of processed/flushed/committed/purged offset should be set to -1, diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/TelemetryConstants.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/TelemetryConstants.java index 7fc097a1b..1a1ac72d1 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/TelemetryConstants.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/TelemetryConstants.java @@ -9,21 +9,21 @@ public final class TelemetryConstants { public static final String STAGE_NAME = "stage_name"; public static final String PIPE_NAME = "pipe_name"; - public static final String PROCESSED_OFFSET = "processed_offset"; - public static final String FLUSHED_OFFSET = "flushed_offset"; - public static final String COMMITTED_OFFSET = "committed_offset"; - public static final String PURGED_OFFSET = "purged_offset"; + public static final String PROCESSED_OFFSET = "processed-offset"; + public static final String FLUSHED_OFFSET = "flushed-offset"; + public static final String COMMITTED_OFFSET = "committed-offset"; + public static final String PURGED_OFFSET = "purged-offset"; public static final String RECORD_NUMBER = "record_number"; public static final String BYTE_NUMBER = "byte_number"; - public static final String FILE_COUNT_ON_STAGE = "file_count_on_stage"; - public static final String FILE_COUNT_ON_INGESTION = "file_count_on_ingestion"; - public static final String FILE_COUNT_PURGED = "file_count_purged"; + public static final String FILE_COUNT_ON_STAGE = "file-count-on-stage"; + public static final String FILE_COUNT_ON_INGESTION = "file-count-on-ingestion"; + public static final String FILE_COUNT_PURGED = "file-count-purged"; public static final String FILE_COUNT_TABLE_STAGE_INGEST_FAIL = "file_count_table_stage_ingest_fail"; public static final String FILE_COUNT_TABLE_STAGE_BROKEN_RECORD = - "file_count_table_stage_broken_record"; + "file-count-table-stage-broken-record"; public static final String CLEANER_RESTART_COUNT = "cleaner_restart_count"; From 88dd84ece216bf5d8673a7d1039472007bdbffb6 Mon Sep 17 00:00:00 2001 From: Jay Patel Date: Fri, 28 Jul 2023 16:19:31 -0700 Subject: [PATCH 5/8] Revert "SNOW-811265 Do not close channel on rebalance (#651)" (#678) --- .../streaming/SnowflakeSinkServiceV2.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java index 09d4824f1..19bb7339a 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java @@ -304,15 +304,14 @@ public void closeAll() { /** * This function is called during rebalance. * - *

We don't close the channels. Upon rebalance, (inside {@link - * com.snowflake.kafka.connector.SnowflakeSinkTask#open(Collection)} we will reopen the channel - * anyways. [Check c'tor of {@link TopicPartitionChannel}] + *

All the channels are closed. The client is still active. Upon rebalance, (inside {@link + * com.snowflake.kafka.connector.SnowflakeSinkTask#open(Collection)} we will reopen the channel. * *

We will wipe the cache partitionsToChannel so that in {@link * com.snowflake.kafka.connector.SnowflakeSinkTask#open(Collection)} we reinstantiate and fetch * offsetToken * - * @param partitions a list of topic partitions to close/shutdown + * @param partitions a list of topic partition */ @Override public void close(Collection partitions) { @@ -324,16 +323,15 @@ public void close(Collection partitions) { partitionsToChannel.get(partitionChannelKey); // Check for null since it's possible that the something goes wrong even before the // channels are created + if (topicPartitionChannel != null) { + topicPartitionChannel.closeChannel(); + } LOGGER.info( - "Removing partitionChannel:{}, partition:{}, topic:{} from map(partitionsToChannel)", + "Closing partitionChannel:{}, partition:{}, topic:{}", topicPartitionChannel == null ? null : topicPartitionChannel.getChannelName(), topicPartition.topic(), topicPartition.partition()); }); - LOGGER.info( - "Closing {} partitions and Clearing partitionsToChannel Map of size:{}", - partitions.size(), - partitionsToChannel.size()); partitionsToChannel.clear(); } From 5787fcb126562877aa6bb2f6b668ecbefd4cbfd4 Mon Sep 17 00:00:00 2001 From: revi cheng Date: Sat, 29 Jul 2023 00:26:07 -0700 Subject: [PATCH 6/8] Create release v2.0.0 for GA (#676) Co-authored-by: Jay Patel --- pom.xml | 8 ++++---- pom_confluent.xml | 10 +++++----- .../com/snowflake/kafka/connector/Utils.java | 2 +- .../streaming/SnowflakeSinkServiceV2IT.java | 19 ------------------- 4 files changed, 10 insertions(+), 29 deletions(-) diff --git a/pom.xml b/pom.xml index 59ac26d6d..810fa1e6f 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ com.snowflake snowflake-kafka-connector - 1.9.4 + 2.0.0 jar Snowflake Kafka Connector Snowflake Kafka Connect Sink Connector @@ -334,7 +334,7 @@ net.snowflake snowflake-ingest-sdk - 2.0.1 + 2.0.2 net.snowflake @@ -369,12 +369,12 @@ com.fasterxml.jackson.core jackson-core - 2.13.1 + 2.15.2 com.fasterxml.jackson.core jackson-databind - 2.13.4.2 + 2.15.2 io.confluent diff --git a/pom_confluent.xml b/pom_confluent.xml index 599402997..0575d7bb7 100644 --- a/pom_confluent.xml +++ b/pom_confluent.xml @@ -12,7 +12,7 @@ com.snowflake snowflake-kafka-connector - 1.9.4 + 2.0.0 jar Snowflake Kafka Connector Snowflake Kafka Connect Sink Connector @@ -386,7 +386,7 @@ net.snowflake snowflake-ingest-sdk - 2.0.1 + 2.0.2 net.snowflake @@ -420,13 +420,13 @@ com.fasterxml.jackson.core - jackson-core - 2.13.1 + jackson-core + 2.15.2 com.fasterxml.jackson.core jackson-databind - 2.13.4.2 + 2.15.2 io.confluent diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java index 9b24606b7..21dbb2fea 100644 --- a/src/main/java/com/snowflake/kafka/connector/Utils.java +++ b/src/main/java/com/snowflake/kafka/connector/Utils.java @@ -50,7 +50,7 @@ public class Utils { // Connector version, change every release - public static final String VERSION = "1.9.4"; + public static final String VERSION = "2.0.0"; // connector parameter list public static final String NAME = "name"; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java index b7e21d90b..71acb037e 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java @@ -3,7 +3,6 @@ import static com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; -import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter; import com.snowflake.kafka.connector.internal.SchematizationTestUtils; import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; @@ -27,13 +26,11 @@ import java.util.List; import java.util.Map; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; -import net.snowflake.ingest.utils.SFException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.After; @@ -764,22 +761,6 @@ public void testBrokenRecordIngestionAfterValidRecord() throws Exception { service.closeAll(); } - @Test(expected = ConnectException.class) - public void testMissingPropertiesForStreamingClient() { - Map config = TestUtils.getConfForStreaming(); - config.remove(Utils.SF_ROLE); - SnowflakeSinkConnectorConfig.setDefaultValues(config); - - try { - SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) - .build(); - } catch (ConnectException ex) { - assert ex.getCause() instanceof SFException; - assert ex.getCause().getMessage().contains("Missing role"); - throw ex; - } - } - /* Service start -> Insert -> Close. service start -> fetch the offsetToken, compare and ingest check data */ @Test From 333d0992da0e3fac02d1a58a22441273d6dffad1 Mon Sep 17 00:00:00 2001 From: revi cheng Date: Fri, 4 Aug 2023 12:41:44 -0700 Subject: [PATCH 7/8] Change pipe already registered log from error to warn level (#679) --- .../kafka/connector/internal/SnowflakeSinkServiceV1.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java index 4bbdb8274..ca8645cf2 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java @@ -108,7 +108,7 @@ public void startTask(final String tableName, final TopicPartition topicPartitio String stageName = Utils.stageName(conn.getConnectorName(), tableName); String nameIndex = getNameIndex(topicPartition.topic(), topicPartition.partition()); if (pipes.containsKey(nameIndex)) { - LOGGER.error("task is already registered, name: {}", nameIndex); + LOGGER.warn("task is already registered with {} partition", nameIndex); } else { String pipeName = Utils.pipeName(conn.getConnectorName(), tableName, topicPartition.partition()); From e56154fe4d90f7433f908e5eb027f1bc894aa0de Mon Sep 17 00:00:00 2001 From: Jay Patel Date: Wed, 9 Aug 2023 15:56:45 -0700 Subject: [PATCH 8/8] SNOW-811265 Not clearing entire map of partitionToChannel, only remove (#687) --- .../streaming/SnowflakeSinkServiceV2.java | 7 +- .../streaming/SnowflakeSinkServiceV2IT.java | 90 ++++++++++++++++++- 2 files changed, 93 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java index 19bb7339a..1d89401af 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java @@ -331,8 +331,13 @@ public void close(Collection partitions) { topicPartitionChannel == null ? null : topicPartitionChannel.getChannelName(), topicPartition.topic(), topicPartition.partition()); + partitionsToChannel.remove(partitionChannelKey); }); - partitionsToChannel.clear(); + LOGGER.info( + "Closing {} partitions and remaining partitions which are not closed are:{}, with size:{}", + partitions.size(), + partitionsToChannel.keySet().toString(), + partitionsToChannel.size()); } @Override diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java index 71acb037e..8c54e6d39 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java @@ -1,5 +1,6 @@ package com.snowflake.kafka.connector.internal.streaming; +import static com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.partitionChannelKey; import static com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; @@ -25,7 +26,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; @@ -43,9 +43,10 @@ public class SnowflakeSinkServiceV2IT { private String table = TestUtils.randomTableName(); private int partition = 0; private int partition2 = 1; - private String topic = "test"; + + // Topic name should be same as table name. (Only for testing, not necessarily in real deployment) + private String topic = table; private TopicPartition topicPartition = new TopicPartition(topic, partition); - private static ObjectMapper MAPPER = new ObjectMapper(); @After public void afterEach() { @@ -125,6 +126,89 @@ public void testChannelCloseIngestion() throws Exception { service.closeAll(); } + // Two partitions, insert Record, one partition gets rebalanced (closed). + // just before rebalance, there is data in buffer for other partition, + // Send data again for both partitions. + // Successfully able to ingest all records + @Test + public void testStreamingIngest_multipleChannelPartitions_closeSubsetOfPartitionsAssigned() + throws Exception { + Map config = TestUtils.getConfForStreaming(); + SnowflakeSinkConnectorConfig.setDefaultValues(config); + conn.createTable(table); + TopicPartition tp1 = new TopicPartition(table, partition); + TopicPartition tp2 = new TopicPartition(table, partition2); + + // opens a channel for partition 0, table and topic + SnowflakeSinkService service = + SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) + .setRecordNumber(5) + .setFlushTime(5) + .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) + .setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition))) + .addTask(table, tp1) // Internally calls startTask + .addTask(table, tp2) // Internally calls startTask + .build(); + + final int recordsInPartition1 = 2; + final int recordsInPartition2 = 2; + List recordsPartition1 = + TestUtils.createJsonStringSinkRecords(0, recordsInPartition1, table, partition); + + List recordsPartition2 = + TestUtils.createJsonStringSinkRecords(0, recordsInPartition2, table, partition2); + + List records = new ArrayList<>(recordsPartition1); + records.addAll(recordsPartition2); + + service.insert(records); + + TestUtils.assertWithRetry( + () -> { + // This is how we will trigger flush. (Mimicking poll API) + service.insert(new ArrayList<>()); // trigger time based flush + return TestUtils.tableSize(table) == recordsInPartition1 + recordsInPartition2; + }, + 10, + 20); + + TestUtils.assertWithRetry(() -> service.getOffset(tp1) == recordsInPartition1, 20, 5); + TestUtils.assertWithRetry(() -> service.getOffset(tp2) == recordsInPartition2, 20, 5); + // before you close partition 1, there should be some data in partition 2 + List newRecordsPartition2 = + TestUtils.createJsonStringSinkRecords(2, recordsInPartition1, table, partition2); + service.insert(newRecordsPartition2); + // partitions to close = 1 out of 2 + List partitionsToClose = Collections.singletonList(tp1); + service.close(partitionsToClose); + + // remaining partition should be present in the map + SnowflakeSinkServiceV2 snowflakeSinkServiceV2 = (SnowflakeSinkServiceV2) service; + + Assert.assertTrue( + snowflakeSinkServiceV2 + .getTopicPartitionChannelFromCacheKey(partitionChannelKey(tp2.topic(), tp2.partition())) + .isPresent()); + + List newRecordsPartition1 = + TestUtils.createJsonStringSinkRecords(2, recordsInPartition1, table, partition); + + List newRecords2Partition2 = + TestUtils.createJsonStringSinkRecords(4, recordsInPartition1, table, partition2); + List newrecords = new ArrayList<>(newRecordsPartition1); + newrecords.addAll(newRecords2Partition2); + + service.insert(newrecords); + TestUtils.assertWithRetry( + () -> { + // This is how we will trigger flush. (Mimicking poll API) + service.insert(new ArrayList<>()); // trigger time based flush + return TestUtils.tableSize(table) == recordsInPartition1 * 5; + }, + 10, + 20); + } + @Test public void testRebalanceOpenCloseIngestion() throws Exception { Map config = TestUtils.getConfForStreaming();