io.confluent
diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java
index 15472c041..47c90f637 100644
--- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java
+++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java
@@ -164,7 +164,7 @@ public class SnowflakeSinkConnectorConfig {
"enable.streaming.client.optimization";
public static final String ENABLE_STREAMING_CLIENT_OPTIMIZATION_DISPLAY =
"Enable streaming client optimization";
- public static final boolean ENABLE_STREAMING_CLIENT_OPTIMIZATION_DEFAULT = false;
+ public static final boolean ENABLE_STREAMING_CLIENT_OPTIMIZATION_DEFAULT = true;
public static final String ENABLE_STREAMING_CLIENT_OPTIMIZATION_DOC =
"Whether to optimize the streaming client to reduce cost. Note that this may affect"
+ " throughput or latency and can only be set if Streaming Snowpipe is enabled";
diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java
index 16c164df5..256730c18 100644
--- a/src/main/java/com/snowflake/kafka/connector/Utils.java
+++ b/src/main/java/com/snowflake/kafka/connector/Utils.java
@@ -70,7 +70,7 @@
public class Utils {
// Connector version, change every release
- public static final String VERSION = "1.9.3";
+ public static final String VERSION = "2.0.0";
// connector parameter list
public static final String NAME = "name";
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java
index 4bbdb8274..ca8645cf2 100644
--- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java
+++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java
@@ -108,7 +108,7 @@ public void startTask(final String tableName, final TopicPartition topicPartitio
String stageName = Utils.stageName(conn.getConnectorName(), tableName);
String nameIndex = getNameIndex(topicPartition.topic(), topicPartition.partition());
if (pipes.containsKey(nameIndex)) {
- LOGGER.error("task is already registered, name: {}", nameIndex);
+ LOGGER.warn("task is already registered with {} partition", nameIndex);
} else {
String pipeName =
Utils.pipeName(conn.getConnectorName(), tableName, topicPartition.partition());
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java
index e2e572246..1d89401af 100644
--- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java
+++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java
@@ -266,7 +266,9 @@ public long getOffset(TopicPartition topicPartition) {
String partitionChannelKey =
partitionChannelKey(topicPartition.topic(), topicPartition.partition());
if (partitionsToChannel.containsKey(partitionChannelKey)) {
- return partitionsToChannel.get(partitionChannelKey).getOffsetSafeToCommitToKafka();
+ long offset = partitionsToChannel.get(partitionChannelKey).getOffsetSafeToCommitToKafka();
+ partitionsToChannel.get(partitionChannelKey).setLatestConsumerOffset(offset);
+ return offset;
} else {
LOGGER.warn(
"Topic: {} Partition: {} hasn't been initialized to get offset",
@@ -302,15 +304,14 @@ public void closeAll() {
/**
* This function is called during rebalance.
*
- * We don't close the channels. Upon rebalance, (inside {@link
- * com.snowflake.kafka.connector.SnowflakeSinkTask#open(Collection)} we will reopen the channel
- * anyways. [Check c'tor of {@link TopicPartitionChannel}]
+ *
All the channels are closed. The client is still active. Upon rebalance, (inside {@link
+ * com.snowflake.kafka.connector.SnowflakeSinkTask#open(Collection)} we will reopen the channel.
*
*
We will wipe the cache partitionsToChannel so that in {@link
* com.snowflake.kafka.connector.SnowflakeSinkTask#open(Collection)} we reinstantiate and fetch
* offsetToken
*
- * @param partitions a list of topic partitions to close/shutdown
+ * @param partitions a list of topic partition
*/
@Override
public void close(Collection partitions) {
@@ -322,17 +323,21 @@ public void close(Collection partitions) {
partitionsToChannel.get(partitionChannelKey);
// Check for null since it's possible that the something goes wrong even before the
// channels are created
+ if (topicPartitionChannel != null) {
+ topicPartitionChannel.closeChannel();
+ }
LOGGER.info(
- "Removing partitionChannel:{}, partition:{}, topic:{} from map(partitionsToChannel)",
+ "Closing partitionChannel:{}, partition:{}, topic:{}",
topicPartitionChannel == null ? null : topicPartitionChannel.getChannelName(),
topicPartition.topic(),
topicPartition.partition());
+ partitionsToChannel.remove(partitionChannelKey);
});
LOGGER.info(
- "Closing {} partitions and Clearing partitionsToChannel Map of size:{}",
+ "Closing {} partitions and remaining partitions which are not closed are:{}, with size:{}",
partitions.size(),
+ partitionsToChannel.keySet().toString(),
partitionsToChannel.size());
- partitionsToChannel.clear();
}
@Override
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
index 0babd1edf..14d68e8c6 100644
--- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
+++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
@@ -83,17 +83,24 @@ public class TopicPartitionChannel {
// -------- private final fields -------- //
- // This offset is updated when Snowflake has received offset from insertRows API
- // We will update this value after calling offsetToken API for this channel
+ // This offset represents the data persisted in Snowflake. More specifically it is the Snowflake
+ // offset determined from the insertRows API call. It is set after calling the fetchOffsetToken
+ // API for this channel
private final AtomicLong offsetPersistedInSnowflake =
new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE);
- // This offset is updated every time KC processed an offset, it's used to make sure that we won't
- // process records that are already processed. It will be reset to the latest committed token
- // every time we fetch it from Snowflake
+ // This offset represents the data buffered in KC. More specifically it is the KC offset to ensure
+ // exactly once functionality. On creation it is set to the latest committed token in Snowflake
+ // (see offsetPersistedInSnowflake) and updated on each new row from KC.
private final AtomicLong processedOffset =
new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE);
+ // This offset is a fallback to represent the data buffered in KC. It is similar to
+ // processedOffset, however it is only used to resend the offset when the channel offset token is
+ // NULL. It is updated to the first offset sent by KC (see processedOffset) or the offset
+ // persisted in Snowflake (see offsetPersistedInSnowflake)
+ private long latestConsumerOffset = NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;
+
/**
* Offsets are reset in kafka when one of following cases arises in which we rely on source of
* truth (Which is Snowflake's committed offsetToken)
@@ -263,6 +270,11 @@ public TopicPartitionChannel(
this.processedOffset.set(lastCommittedOffsetToken);
if (lastCommittedOffsetToken != NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
this.sinkTaskContext.offset(this.topicPartition, lastCommittedOffsetToken + 1L);
+ } else {
+ LOGGER.info(
+ "TopicPartitionChannel:{}, offset token is NULL, will rely on Kafka to send us the"
+ + " correct offset instead",
+ this.getChannelName());
}
}
@@ -281,6 +293,11 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) {
final long currentOffsetPersistedInSnowflake = this.offsetPersistedInSnowflake.get();
final long currentProcessedOffset = this.processedOffset.get();
+ // Set the consumer offset to be the first record that Kafka sends us
+ if (latestConsumerOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
+ latestConsumerOffset = kafkaSinkRecord.kafkaOffset();
+ }
+
// Ignore adding to the buffer until we see the expected offset value
if (shouldIgnoreAddingRecordToBuffer(kafkaSinkRecord, currentProcessedOffset)) {
return;
@@ -345,7 +362,10 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) {
*/
private boolean shouldIgnoreAddingRecordToBuffer(
SinkRecord kafkaSinkRecord, long currentProcessedOffset) {
- if (!isOffsetResetInKafka) {
+ // Don't skip rows if there is no offset reset or there is no offset token information in the
+ // channel
+ if (!isOffsetResetInKafka
+ || currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
return false;
}
@@ -496,13 +516,8 @@ InsertRowsResponse insertBufferedRecords(StreamingBuffer streamingBufferToInsert
// Due to schema evolution, we may need to reopen the channel and reset the offset in kafka
// since it's possible that not all rows are ingested
if (response.needToResetOffset()) {
- final long offsetRecoveredFromSnowflake =
- streamingApiFallbackSupplier(
- StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK);
- // If there is no valid offset token at server side even after the reset, retry it again
- if (offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
- insertBufferedRecords(streamingBufferToInsert);
- }
+ streamingApiFallbackSupplier(
+ StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK);
}
return response;
} catch (TopicPartitionChannelInsertionException ex) {
@@ -639,9 +654,6 @@ public InsertRowsResponse get() throws Throwable {
}
}
}
- // TODO SNOW-758492: for schematization, sleep a few seconds after the insert to avoid
- // combining channels with different schemas
- Thread.sleep(2000);
}
return new InsertRowsResponse(finalResponse, needToResetOffset);
}
@@ -877,20 +889,25 @@ private long streamingApiFallbackSupplier(
private void resetChannelMetadataAfterRecovery(
final StreamingApiFallbackInvoker streamingApiFallbackInvoker,
final long offsetRecoveredFromSnowflake) {
- // If we don't get a valid offset token from server side, reset the processed offset to invalid
- // and rely on kafka to send us the correct data
if (offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
- this.offsetPersistedInSnowflake.set(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE);
- this.processedOffset.set(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE);
- LOGGER.warn(
- "{} Channel:{}, OffsetRecoveredFromSnowflake:{}, skip reset kafka offset",
+ LOGGER.info(
+ "{} Channel:{}, offset token is NULL, will use the consumer offset managed by the"
+ + " connector instead, consumer offset:{}",
streamingApiFallbackInvoker,
this.getChannelName(),
- offsetRecoveredFromSnowflake);
+ latestConsumerOffset);
+ }
+
+ // If the offset token in the channel is null, use the consumer offset managed by the connector;
+ // otherwise use the offset token stored in the channel
+ final long offsetToResetInKafka =
+ offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE
+ ? latestConsumerOffset
+ : offsetRecoveredFromSnowflake + 1L;
+ if (offsetToResetInKafka == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
return;
}
- final long offsetToResetInKafka = offsetRecoveredFromSnowflake + 1L;
// reset the buffer
this.bufferLock.lock();
try {
@@ -916,7 +933,7 @@ private void resetChannelMetadataAfterRecovery(
this.bufferLock.unlock();
}
LOGGER.warn(
- "{} Channel:{}, OffsetRecoveredFromSnowflake:{}, Reset kafka offset to:{}",
+ "{} Channel:{}, OffsetRecoveredFromSnowflake:{}, reset kafka offset to:{}",
streamingApiFallbackInvoker,
this.getChannelName(),
offsetRecoveredFromSnowflake,
@@ -954,26 +971,15 @@ private long getRecoveredOffsetFromSnowflake(
* snowflake.
*/
private long fetchLatestCommittedOffsetFromSnowflake() {
- LOGGER.debug(
- "Fetching last committed offset for partition channel:{}",
- this.channel.getFullyQualifiedName());
+ LOGGER.debug("Fetching last committed offset for partition channel:{}", this.getChannelName());
String offsetToken = null;
try {
offsetToken = this.channel.getLatestCommittedOffsetToken();
- if (offsetToken == null) {
- LOGGER.warn(
- "OffsetToken not present for channelName:{}, will rely on kafka consumer offset as"
- + " source of truth",
- this.getChannelName());
- return NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;
- } else {
- long latestCommittedOffsetInSnowflake = Long.parseLong(offsetToken);
- LOGGER.info(
- "Fetched offsetToken:{} for channelName:{}",
- latestCommittedOffsetInSnowflake,
- this.channel.getFullyQualifiedName());
- return latestCommittedOffsetInSnowflake;
- }
+ LOGGER.info(
+ "Fetched offsetToken for channelName:{}, offset:{}", this.getChannelName(), offsetToken);
+ return offsetToken == null
+ ? NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE
+ : Long.parseLong(offsetToken);
} catch (NumberFormatException ex) {
LOGGER.error(
"The offsetToken string does not contain a parsable long:{} for channel:{}",
@@ -1075,6 +1081,12 @@ protected SnowflakeTelemetryService getTelemetryServiceV2() {
return this.telemetryServiceV2;
}
+ protected void setLatestConsumerOffset(long consumerOffset) {
+ if (consumerOffset > this.latestConsumerOffset) {
+ this.latestConsumerOffset = consumerOffset;
+ }
+ }
+
/**
* Converts the original kafka sink record into a Json Record. i.e key and values are converted
* into Json so that it can be used to insert into variant column of Snowflake Table.
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java
index abb940725..352615a34 100644
--- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java
+++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java
@@ -9,27 +9,17 @@
public abstract class SnowflakeTelemetryBasicInfo {
final String tableName;
- /**
- * User created or user visible stage name.
- *
- * It is null for Snowpipe Streaming Telemetry.
- */
- final String stageName;
-
static final KCLogger LOGGER = new KCLogger(SnowflakeTelemetryBasicInfo.class.getName());
/**
* Base Constructor. Accepts a tableName and StageName.
*
* @param tableName Checks for Nullability
- * @param stageName Can be null (In case of Snowpipe Streaming since there is no user visible
- * Snowflake Stage)
*/
- public SnowflakeTelemetryBasicInfo(final String tableName, final String stageName) {
+ public SnowflakeTelemetryBasicInfo(final String tableName) {
Preconditions.checkArgument(
!Strings.isNullOrEmpty(tableName), "tableName cannot be null or empty");
this.tableName = tableName;
- this.stageName = stageName;
}
/**
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java
index 2f6cbd090..627f03cf0 100644
--- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java
+++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java
@@ -24,11 +24,13 @@ public class SnowflakeTelemetryPipeCreation extends SnowflakeTelemetryBasicInfo
int fileCountReprocessPurge =
0; // files on stage that are purged due to reprocessing when cleaner starts
long startTime; // start time of the pipe
+ private final String stageName;
private final String pipeName;
public SnowflakeTelemetryPipeCreation(
final String tableName, final String stageName, final String pipeName) {
- super(tableName, stageName);
+ super(tableName);
+ this.stageName = stageName;
this.pipeName = pipeName;
this.startTime = System.currentTimeMillis();
}
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java
index 82c748d3c..3efd397b5 100644
--- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java
+++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java
@@ -1,6 +1,10 @@
package com.snowflake.kafka.connector.internal.telemetry;
-import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.*;
+import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.FILE_COUNT_SUB_DOMAIN;
+import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.FILE_COUNT_TABLE_STAGE_INGESTION_FAIL;
+import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.LATENCY_SUB_DOMAIN;
+import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.OFFSET_SUB_DOMAIN;
+import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.constructMetricName;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.AVERAGE_COMMIT_LAG_FILE_COUNT;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.AVERAGE_COMMIT_LAG_MS;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.AVERAGE_INGESTION_LAG_FILE_COUNT;
@@ -9,23 +13,34 @@
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.AVERAGE_KAFKA_LAG_RECORD_COUNT;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.BYTE_NUMBER;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.CLEANER_RESTART_COUNT;
+import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.COMMITTED_OFFSET;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.END_TIME;
+import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_ON_INGESTION;
+import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_ON_STAGE;
+import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_PURGED;
+import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_TABLE_STAGE_BROKEN_RECORD;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_TABLE_STAGE_INGEST_FAIL;
+import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FLUSHED_OFFSET;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.MEMORY_USAGE;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PIPE_NAME;
+import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PROCESSED_OFFSET;
+import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PURGED_OFFSET;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.RECORD_NUMBER;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.STAGE_NAME;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.START_TIME;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.TABLE_NAME;
-import com.codahale.metrics.*;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter;
import com.snowflake.kafka.connector.internal.metrics.MetricsUtil;
import com.snowflake.kafka.connector.internal.metrics.MetricsUtil.EventType;
-import java.util.*;
+import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -108,6 +123,7 @@ public class SnowflakeTelemetryPipeStatus extends SnowflakeTelemetryBasicInfo {
// May not be set if jmx is set to false
private Meter fileCountTableStageBrokenRecordMeter, fileCountTableStageIngestFailMeter;
+ private final String stageName;
private final String pipeName;
public SnowflakeTelemetryPipeStatus(
@@ -116,7 +132,8 @@ public SnowflakeTelemetryPipeStatus(
final String pipeName,
final boolean enableCustomJMXConfig,
final MetricsJmxReporter metricsJmxReporter) {
- super(tableName, stageName);
+ super(tableName);
+ this.stageName = stageName;
this.pipeName = pipeName;
// Initial value of processed/flushed/committed/purged offset should be set to -1,
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/TelemetryConstants.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/TelemetryConstants.java
index 7fc097a1b..1a1ac72d1 100644
--- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/TelemetryConstants.java
+++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/TelemetryConstants.java
@@ -9,21 +9,21 @@ public final class TelemetryConstants {
public static final String STAGE_NAME = "stage_name";
public static final String PIPE_NAME = "pipe_name";
- public static final String PROCESSED_OFFSET = "processed_offset";
- public static final String FLUSHED_OFFSET = "flushed_offset";
- public static final String COMMITTED_OFFSET = "committed_offset";
- public static final String PURGED_OFFSET = "purged_offset";
+ public static final String PROCESSED_OFFSET = "processed-offset";
+ public static final String FLUSHED_OFFSET = "flushed-offset";
+ public static final String COMMITTED_OFFSET = "committed-offset";
+ public static final String PURGED_OFFSET = "purged-offset";
public static final String RECORD_NUMBER = "record_number";
public static final String BYTE_NUMBER = "byte_number";
- public static final String FILE_COUNT_ON_STAGE = "file_count_on_stage";
- public static final String FILE_COUNT_ON_INGESTION = "file_count_on_ingestion";
- public static final String FILE_COUNT_PURGED = "file_count_purged";
+ public static final String FILE_COUNT_ON_STAGE = "file-count-on-stage";
+ public static final String FILE_COUNT_ON_INGESTION = "file-count-on-ingestion";
+ public static final String FILE_COUNT_PURGED = "file-count-purged";
public static final String FILE_COUNT_TABLE_STAGE_INGEST_FAIL =
"file_count_table_stage_ingest_fail";
public static final String FILE_COUNT_TABLE_STAGE_BROKEN_RECORD =
- "file_count_table_stage_broken_record";
+ "file-count-table-stage-broken-record";
public static final String CLEANER_RESTART_COUNT = "cleaner_restart_count";
diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java
index 6657774f2..11ec8cae5 100644
--- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java
+++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java
@@ -1,9 +1,9 @@
package com.snowflake.kafka.connector.internal.streaming;
+import static com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.partitionChannelKey;
import static com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
-import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter;
import com.snowflake.kafka.connector.internal.SchematizationTestUtils;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
@@ -27,14 +27,11 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
-import net.snowflake.ingest.utils.SFException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
-import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
@@ -50,9 +47,10 @@ public class SnowflakeSinkServiceV2IT {
private String table = TestUtils.randomTableName();
private int partition = 0;
private int partition2 = 1;
- private String topic = "test";
+
+ // Topic name should be same as table name. (Only for testing, not necessarily in real deployment)
+ private String topic = table;
private TopicPartition topicPartition = new TopicPartition(topic, partition);
- private static ObjectMapper MAPPER = new ObjectMapper();
// use OAuth as authenticator or not
private boolean useOAuth;
@@ -155,6 +153,89 @@ public void testChannelCloseIngestion() throws Exception {
service.closeAll();
}
+ // Two partitions, insert Record, one partition gets rebalanced (closed).
+ // just before rebalance, there is data in buffer for other partition,
+ // Send data again for both partitions.
+ // Successfully able to ingest all records
+ @Test
+ public void testStreamingIngest_multipleChannelPartitions_closeSubsetOfPartitionsAssigned()
+ throws Exception {
+ Map config = TestUtils.getConfForStreaming();
+ SnowflakeSinkConnectorConfig.setDefaultValues(config);
+ conn.createTable(table);
+ TopicPartition tp1 = new TopicPartition(table, partition);
+ TopicPartition tp2 = new TopicPartition(table, partition2);
+
+ // opens a channel for partition 0, table and topic
+ SnowflakeSinkService service =
+ SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
+ .setRecordNumber(5)
+ .setFlushTime(5)
+ .setErrorReporter(new InMemoryKafkaRecordErrorReporter())
+ .setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
+ .addTask(table, tp1) // Internally calls startTask
+ .addTask(table, tp2) // Internally calls startTask
+ .build();
+
+ final int recordsInPartition1 = 2;
+ final int recordsInPartition2 = 2;
+ List recordsPartition1 =
+ TestUtils.createJsonStringSinkRecords(0, recordsInPartition1, table, partition);
+
+ List recordsPartition2 =
+ TestUtils.createJsonStringSinkRecords(0, recordsInPartition2, table, partition2);
+
+ List records = new ArrayList<>(recordsPartition1);
+ records.addAll(recordsPartition2);
+
+ service.insert(records);
+
+ TestUtils.assertWithRetry(
+ () -> {
+ // This is how we will trigger flush. (Mimicking poll API)
+ service.insert(new ArrayList<>()); // trigger time based flush
+ return TestUtils.tableSize(table) == recordsInPartition1 + recordsInPartition2;
+ },
+ 10,
+ 20);
+
+ TestUtils.assertWithRetry(() -> service.getOffset(tp1) == recordsInPartition1, 20, 5);
+ TestUtils.assertWithRetry(() -> service.getOffset(tp2) == recordsInPartition2, 20, 5);
+ // before you close partition 1, there should be some data in partition 2
+ List newRecordsPartition2 =
+ TestUtils.createJsonStringSinkRecords(2, recordsInPartition1, table, partition2);
+ service.insert(newRecordsPartition2);
+ // partitions to close = 1 out of 2
+ List partitionsToClose = Collections.singletonList(tp1);
+ service.close(partitionsToClose);
+
+ // remaining partition should be present in the map
+ SnowflakeSinkServiceV2 snowflakeSinkServiceV2 = (SnowflakeSinkServiceV2) service;
+
+ Assert.assertTrue(
+ snowflakeSinkServiceV2
+ .getTopicPartitionChannelFromCacheKey(partitionChannelKey(tp2.topic(), tp2.partition()))
+ .isPresent());
+
+ List newRecordsPartition1 =
+ TestUtils.createJsonStringSinkRecords(2, recordsInPartition1, table, partition);
+
+ List newRecords2Partition2 =
+ TestUtils.createJsonStringSinkRecords(4, recordsInPartition1, table, partition2);
+ List newrecords = new ArrayList<>(newRecordsPartition1);
+ newrecords.addAll(newRecords2Partition2);
+
+ service.insert(newrecords);
+ TestUtils.assertWithRetry(
+ () -> {
+ // This is how we will trigger flush. (Mimicking poll API)
+ service.insert(new ArrayList<>()); // trigger time based flush
+ return TestUtils.tableSize(table) == recordsInPartition1 * 5;
+ },
+ 10,
+ 20);
+ }
+
@Test
public void testRebalanceOpenCloseIngestion() throws Exception {
Map config = getConfig();
@@ -790,23 +871,7 @@ public void testBrokenRecordIngestionAfterValidRecord() throws Exception {
service.closeAll();
}
-
- @Test(expected = ConnectException.class)
- public void testMissingPropertiesForStreamingClient() {
- Map config = getConfig();
- config.remove(Utils.SF_ROLE);
- SnowflakeSinkConnectorConfig.setDefaultValues(config);
-
- try {
- SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
- .build();
- } catch (ConnectException ex) {
- assert ex.getCause() instanceof SFException;
- assert ex.getCause().getMessage().contains("Missing role");
- throw ex;
- }
- }
-
+
/* Service start -> Insert -> Close. service start -> fetch the offsetToken, compare and ingest check data */
@Test
@@ -976,7 +1041,6 @@ public void testSchematizationWithTableCreationAndAvroInput() throws Exception {
SchemaAndValue avroInputValue = avroConverter.toConnectData(topic, converted);
long startOffset = 0;
- long endOffset = 0;
SinkRecord avroRecordValue =
new SinkRecord(
@@ -996,12 +1060,22 @@ public void testSchematizationWithTableCreationAndAvroInput() throws Exception {
.addTask(table, new TopicPartition(topic, partition))
.build();
- // Schema evolution should kick in to update the schema and then the rows should be ingested
+ // The first insert should fail and schema evolution will kick in to update the schema
service.insert(avroRecordValue);
TestUtils.assertWithRetry(
- () -> service.getOffset(new TopicPartition(topic, partition)) == endOffset + 1, 20, 5);
+ () ->
+ service.getOffset(new TopicPartition(topic, partition))
+ == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE,
+ 20,
+ 5);
TestUtils.checkTableSchema(table, SchematizationTestUtils.SF_AVRO_SCHEMA_FOR_TABLE_CREATION);
+
+ // Retry the insert should succeed now with the updated schema
+ service.insert(avroRecordValue);
+ TestUtils.assertWithRetry(
+ () -> service.getOffset(new TopicPartition(topic, partition)) == startOffset + 1, 20, 5);
+
TestUtils.checkTableContentOneRow(
table, SchematizationTestUtils.CONTENT_FOR_AVRO_TABLE_CREATION);
@@ -1056,7 +1130,6 @@ public void testSchematizationWithTableCreationAndJsonInput() throws Exception {
SchemaAndValue jsonInputValue = jsonConverter.toConnectData(topic, converted);
long startOffset = 0;
- long endOffset = 0;
SinkRecord jsonRecordValue =
new SinkRecord(
@@ -1076,11 +1149,21 @@ public void testSchematizationWithTableCreationAndJsonInput() throws Exception {
.addTask(table, new TopicPartition(topic, partition))
.build();
- // Schema evolution should kick in to update the schema and then the rows should be ingested
+ // The first insert should fail and schema evolution will kick in to update the schema
service.insert(jsonRecordValue);
TestUtils.assertWithRetry(
- () -> service.getOffset(new TopicPartition(topic, partition)) == startOffset + 1, 20, 5);
+ () ->
+ service.getOffset(new TopicPartition(topic, partition))
+ == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE,
+ 20,
+ 5);
TestUtils.checkTableSchema(table, SchematizationTestUtils.SF_JSON_SCHEMA_FOR_TABLE_CREATION);
+
+ // Retry the insert should succeed now with the updated schema
+ service.insert(jsonRecordValue);
+ TestUtils.assertWithRetry(
+ () -> service.getOffset(new TopicPartition(topic, partition)) == startOffset + 1, 20, 5);
+
TestUtils.checkTableContentOneRow(
table, SchematizationTestUtils.CONTENT_FOR_JSON_TABLE_CREATION);
@@ -1130,7 +1213,26 @@ public void testSchematizationSchemaEvolutionWithNonNullableColumn() throws Exce
.addTask(table, new TopicPartition(topic, partition))
.build();
- // Schema evolution should kick in to update the schema and then the rows should be ingested
+ // The first insert should fail and schema evolution will kick in to add the column
+ service.insert(jsonRecordValue);
+ TestUtils.assertWithRetry(
+ () ->
+ service.getOffset(new TopicPartition(topic, partition))
+ == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE,
+ 20,
+ 5);
+
+ // The second insert should fail again and schema evolution will kick in to update the
+ // nullability
+ service.insert(jsonRecordValue);
+ TestUtils.assertWithRetry(
+ () ->
+ service.getOffset(new TopicPartition(topic, partition))
+ == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE,
+ 20,
+ 5);
+
+ // Retry the insert should succeed now with the updated schema
service.insert(jsonRecordValue);
TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, partition)) == startOffset + 1, 20, 5);
diff --git a/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json b/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json
new file mode 100644
index 000000000..38e50ab1b
--- /dev/null
+++ b/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json
@@ -0,0 +1,28 @@
+{
+ "name": "SNOWFLAKE_CONNECTOR_NAME",
+ "config": {
+ "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
+ "topics": "SNOWFLAKE_TEST_TOPIC0,SNOWFLAKE_TEST_TOPIC1",
+ "snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME,SNOWFLAKE_TEST_TOPIC1:SNOWFLAKE_CONNECTOR_NAME",
+ "tasks.max": "1",
+ "buffer.flush.time": "10",
+ "buffer.count.records": "100",
+ "buffer.size.bytes": "5000000",
+ "snowflake.url.name": "SNOWFLAKE_HOST",
+ "snowflake.user.name": "SNOWFLAKE_USER",
+ "snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY",
+ "snowflake.database.name": "SNOWFLAKE_DATABASE",
+ "snowflake.schema.name": "SNOWFLAKE_SCHEMA",
+ "snowflake.role.name": "SNOWFLAKE_ROLE",
+ "snowflake.ingestion.method": "SNOWPIPE_STREAMING",
+ "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+ "value.converter": "org.apache.kafka.connect.json.JsonConverter",
+ "value.converter.schemas.enable": "false",
+ "jmx": "true",
+ "errors.tolerance": "all",
+ "errors.log.enable": true,
+ "errors.deadletterqueue.topic.name": "DLQ_TOPIC",
+ "errors.deadletterqueue.topic.replication.factor": 1,
+ "snowflake.enable.schematization": true
+ }
+}
\ No newline at end of file
diff --git a/test/test_suit/test_schema_evolution_drop_table.py b/test/test_suit/test_schema_evolution_drop_table.py
index 8b303176c..06c2cf49f 100644
--- a/test/test_suit/test_schema_evolution_drop_table.py
+++ b/test/test_suit/test_schema_evolution_drop_table.py
@@ -46,7 +46,7 @@ def send(self):
self.driver.sendBytesData(self.topic, value, key)
# Sleep for some time and then verify the rows are ingested
- sleep(60)
+ sleep(120)
self.verify("0")
# Recreate the table
@@ -56,7 +56,6 @@ def send(self):
"alter table {} set ENABLE_SCHEMA_EVOLUTION = true".format(self.table))
# Ingest another set of rows
- sleep(30)
self.driver.sendBytesData(self.topic, value, key)
def verify(self, round):
diff --git a/test/test_suit/test_schema_evolution_multi_topic_drop_table.py b/test/test_suit/test_schema_evolution_multi_topic_drop_table.py
new file mode 100644
index 000000000..b72a24408
--- /dev/null
+++ b/test/test_suit/test_schema_evolution_multi_topic_drop_table.py
@@ -0,0 +1,109 @@
+import json
+from time import sleep
+
+from test_suit.test_utils import NonRetryableError
+
+
+# test if the table is updated with the correct column, and if the table is
+# recreated and updated after it's being dropped
+class TestSchemaEvolutionMultiTopicDropTable:
+ def __init__(self, driver, nameSalt):
+ self.driver = driver
+ self.fileName = "travis_correct_schema_evolution_multi_topic_drop_table"
+ self.topics = []
+ self.table = self.fileName + nameSalt
+ self.recordNum = 100
+
+ for i in range(2):
+ self.topics.append(self.table + str(i))
+
+ self.driver.snowflake_conn.cursor().execute(
+ "Create or replace table {} (PERFORMANCE_STRING STRING)".format(self.table))
+
+ self.driver.snowflake_conn.cursor().execute(
+ "alter table {} set ENABLE_SCHEMA_EVOLUTION = true".format(self.table))
+
+ self.records = []
+
+ self.records.append({
+ 'PERFORMANCE_STRING': 'Excellent',
+ '"case_sensitive_PERFORMANCE_CHAR"': 'A',
+ 'RATING_INT': 100
+ })
+
+ self.records.append({
+ 'PERFORMANCE_STRING': 'Excellent',
+ 'RATING_DOUBLE': 0.99,
+ 'APPROVAL': True
+ })
+
+ self.gold_type = {
+ 'PERFORMANCE_STRING': 'VARCHAR',
+ 'case_sensitive_PERFORMANCE_CHAR': 'VARCHAR',
+ 'RATING_INT': 'NUMBER',
+ 'RATING_DOUBLE': 'FLOAT',
+ 'APPROVAL': 'BOOLEAN',
+ 'RECORD_METADATA': 'VARIANT'
+ }
+
+ self.gold_columns = [columnName for columnName in self.gold_type]
+
+ def getConfigFileName(self):
+ return self.fileName + ".json"
+
+ def send(self):
+ for i, topic in enumerate(self.topics):
+ key = []
+ value = []
+ for e in range(self.recordNum):
+ key.append(json.dumps({'number': str(e)}).encode('utf-8'))
+ value.append(json.dumps(self.records[i]).encode('utf-8'))
+ self.driver.sendBytesData(topic, value, key)
+
+ # Sleep for some time and then verify the rows are ingested
+ sleep(120)
+ self.verify("0")
+
+ # Recreate the table
+ self.driver.snowflake_conn.cursor().execute(
+ "Create or replace table {} (PERFORMANCE_STRING STRING, RECORD_METADATA VARIANT)".format(self.table))
+ self.driver.snowflake_conn.cursor().execute(
+ "alter table {} set ENABLE_SCHEMA_EVOLUTION = true".format(self.table))
+
+ # Ingest another set of rows
+ for i, topic in enumerate(self.topics):
+ key = []
+ value = []
+ for e in range(self.recordNum):
+ key.append(json.dumps({'number': str(e)}).encode('utf-8'))
+ value.append(json.dumps(self.records[i]).encode('utf-8'))
+ self.driver.sendBytesData(topic, value, key)
+
+ def verify(self, round):
+ rows = self.driver.snowflake_conn.cursor().execute(
+ "desc table {}".format(self.table)).fetchall()
+ res_col = {}
+
+ gold_columns_copy = self.gold_columns.copy()
+
+ for index, row in enumerate(rows):
+ gold_columns_copy.remove(row[0])
+ if not row[1].startswith(self.gold_type[row[0]]):
+ raise NonRetryableError("Column {} has the wrong type. got: {}, expected: {}".format(row[0], row[1],
+ self.gold_type[
+ row[0]]))
+ res_col[row[0]] = index
+
+ print("Columns not in table: ", gold_columns_copy)
+
+ for columnName in gold_columns_copy:
+ raise NonRetryableError("Column {} was not created".format(columnName))
+
+ res = self.driver.snowflake_conn.cursor().execute(
+ "SELECT count(*) FROM {}".format(self.table)).fetchone()[0]
+ if res != self.recordNum * len(self.topics):
+ print("Number of record expected: {}, got: {}".format(self.recordNum * len(self.topics), res))
+ raise NonRetryableError("Number of record in table is different from number of record sent")
+
+ def clean(self):
+ self.driver.cleanTableStagePipe(self.table)
diff --git a/test/test_suites.py b/test/test_suites.py
index d1dcba794..4ce1a26c5 100644
--- a/test/test_suites.py
+++ b/test/test_suites.py
@@ -1,48 +1,5 @@
from collections import OrderedDict
-from test_suit.test_string_json import TestStringJson
-from test_suit.test_json_json import TestJsonJson
-from test_suit.test_string_avro import TestStringAvro
-from test_suit.test_avro_avro import TestAvroAvro
-from test_suit.test_string_avrosr import TestStringAvrosr
-from test_suit.test_avrosr_avrosr import TestAvrosrAvrosr
-
-from test_suit.test_native_string_avrosr import TestNativeStringAvrosr
-from test_suit.test_native_string_json_without_schema import TestNativeStringJsonWithoutSchema
-from test_suit.test_native_complex_smt import TestNativeComplexSmt
-
-from test_suit.test_native_string_protobuf import TestNativeStringProtobuf
-from test_suit.test_confluent_protobuf_protobuf import TestConfluentProtobufProtobuf
-
-from test_suit.test_snowpipe_streaming_string_json import TestSnowpipeStreamingStringJson
-from test_suit.test_snowpipe_streaming_string_json_dlq import TestSnowpipeStreamingStringJsonDLQ
-from test_suit.test_snowpipe_streaming_string_avro_sr import TestSnowpipeStreamingStringAvroSR
-
-from test_suit.test_multiple_topic_to_one_table_snowpipe_streaming import \
- TestMultipleTopicToOneTableSnowpipeStreaming
-from test_suit.test_multiple_topic_to_one_table_snowpipe import TestMultipleTopicToOneTableSnowpipe
-
-from test_suit.test_schema_mapping import TestSchemaMapping
-
-from test_suit.test_auto_table_creation import TestAutoTableCreation
-from test_suit.test_auto_table_creation_topic2table import TestAutoTableCreationTopic2Table
-
-from test_suit.test_schema_evolution_json import TestSchemaEvolutionJson
-from test_suit.test_schema_evolution_avro_sr import TestSchemaEvolutionAvroSR
-
-from test_suit.test_schema_evolution_w_auto_table_creation_json import \
- TestSchemaEvolutionWithAutoTableCreationJson
-from test_suit.test_schema_evolution_w_auto_table_creation_avro_sr import \
- TestSchemaEvolutionWithAutoTableCreationAvroSR
-
-from test_suit.test_schema_evolution_nonnullable_json import TestSchemaEvolutionNonNullableJson
-
-from test_suit.test_schema_not_supported_converter import TestSchemaNotSupportedConverter
-
-from test_suit.test_schema_evolution_drop_table import TestSchemaEvolutionDropTable
-
-from test_suit.test_snowpipe_streaming_schema_mapping_dlq import TestSnowpipeStreamingSchemaMappingDLQ
-# res tests
from test_suit.resilience_tests.test_kc_delete_create import TestKcDeleteCreate
from test_suit.resilience_tests.test_kc_delete_create_chaos import TestKcDeleteCreateChaos
from test_suit.resilience_tests.test_kc_delete_resume import TestKcDeleteResume
@@ -54,6 +11,38 @@
from test_suit.resilience_tests.test_kc_recreate import TestKcRecreate
from test_suit.resilience_tests.test_kc_recreate_chaos import TestKcRecreateChaos
from test_suit.resilience_tests.test_kc_restart import TestKcRestart
+from test_suit.test_auto_table_creation import TestAutoTableCreation
+from test_suit.test_auto_table_creation_topic2table import TestAutoTableCreationTopic2Table
+from test_suit.test_avro_avro import TestAvroAvro
+from test_suit.test_avrosr_avrosr import TestAvrosrAvrosr
+from test_suit.test_confluent_protobuf_protobuf import TestConfluentProtobufProtobuf
+from test_suit.test_json_json import TestJsonJson
+from test_suit.test_multiple_topic_to_one_table_snowpipe import TestMultipleTopicToOneTableSnowpipe
+from test_suit.test_multiple_topic_to_one_table_snowpipe_streaming import \
+ TestMultipleTopicToOneTableSnowpipeStreaming
+from test_suit.test_native_complex_smt import TestNativeComplexSmt
+from test_suit.test_native_string_avrosr import TestNativeStringAvrosr
+from test_suit.test_native_string_json_without_schema import TestNativeStringJsonWithoutSchema
+from test_suit.test_native_string_protobuf import TestNativeStringProtobuf
+from test_suit.test_schema_evolution_avro_sr import TestSchemaEvolutionAvroSR
+from test_suit.test_schema_evolution_drop_table import TestSchemaEvolutionDropTable
+from test_suit.test_schema_evolution_json import TestSchemaEvolutionJson
+from test_suit.test_schema_evolution_multi_topic_drop_table import TestSchemaEvolutionMultiTopicDropTable
+from test_suit.test_schema_evolution_nonnullable_json import TestSchemaEvolutionNonNullableJson
+from test_suit.test_schema_evolution_w_auto_table_creation_avro_sr import \
+ TestSchemaEvolutionWithAutoTableCreationAvroSR
+from test_suit.test_schema_evolution_w_auto_table_creation_json import \
+ TestSchemaEvolutionWithAutoTableCreationJson
+from test_suit.test_schema_mapping import TestSchemaMapping
+from test_suit.test_schema_not_supported_converter import TestSchemaNotSupportedConverter
+from test_suit.test_snowpipe_streaming_schema_mapping_dlq import TestSnowpipeStreamingSchemaMappingDLQ
+from test_suit.test_snowpipe_streaming_string_avro_sr import TestSnowpipeStreamingStringAvroSR
+from test_suit.test_snowpipe_streaming_string_json import TestSnowpipeStreamingStringJson
+from test_suit.test_snowpipe_streaming_string_json_dlq import TestSnowpipeStreamingStringJsonDLQ
+from test_suit.test_string_avro import TestStringAvro
+from test_suit.test_string_avrosr import TestStringAvrosr
+from test_suit.test_string_json import TestStringJson
+
class EndToEndTestSuite:
'''
@@ -61,6 +50,7 @@ class EndToEndTestSuite:
Just modify the caller constructor of this class to disable, enable in confluent or Apache Kafka.
In future can add whether it runs in snowpipe or snowpipe streaming mode.
'''
+
def __init__(self, test_instance, clean, run_in_confluent, run_in_apache):
self._test_instance = test_instance
self._clean = clean
@@ -83,6 +73,7 @@ def run_in_confluent(self):
def run_in_apache(self):
return self._run_in_apache
+
def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testSet):
'''
Creates all End to End tests which needs to run against Confluent Kafka or Apache Kafka.
@@ -112,91 +103,114 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
test_instance=TestAvrosrAvrosr(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=False
)),
("TestNativeStringAvrosr", EndToEndTestSuite(
- test_instance=TestNativeStringAvrosr(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=False
+ test_instance=TestNativeStringAvrosr(driver, nameSalt), clean=True, run_in_confluent=True,
+ run_in_apache=False
)),
("TestNativeStringJsonWithoutSchema", EndToEndTestSuite(
- test_instance=TestNativeStringJsonWithoutSchema(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
+ test_instance=TestNativeStringJsonWithoutSchema(driver, nameSalt), clean=True, run_in_confluent=True,
+ run_in_apache=True
)),
("TestNativeComplexSmt", EndToEndTestSuite(
test_instance=TestNativeComplexSmt(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
)),
("TestNativeStringProtobuf", EndToEndTestSuite(
- test_instance=TestNativeStringProtobuf(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
+ test_instance=TestNativeStringProtobuf(driver, nameSalt), clean=True, run_in_confluent=True,
+ run_in_apache=True
)),
("TestConfluentProtobufProtobuf", EndToEndTestSuite(
- test_instance=TestConfluentProtobufProtobuf(driver, nameSalt), clean=True, run_in_confluent=False, run_in_apache=False
+ test_instance=TestConfluentProtobufProtobuf(driver, nameSalt), clean=True, run_in_confluent=False,
+ run_in_apache=False
)),
("TestSnowpipeStreamingStringJson", EndToEndTestSuite(
- test_instance=TestSnowpipeStreamingStringJson(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
+ test_instance=TestSnowpipeStreamingStringJson(driver, nameSalt), clean=True, run_in_confluent=True,
+ run_in_apache=True
)),
("TestSnowpipeStreamingStringJsonDLQ", EndToEndTestSuite(
- test_instance=TestSnowpipeStreamingStringJsonDLQ(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
+ test_instance=TestSnowpipeStreamingStringJsonDLQ(driver, nameSalt), clean=True, run_in_confluent=True,
+ run_in_apache=True
)),
("TestSnowpipeStreamingStringAvro", EndToEndTestSuite(
- test_instance=TestSnowpipeStreamingStringAvroSR(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=False
+ test_instance=TestSnowpipeStreamingStringAvroSR(driver, nameSalt), clean=True, run_in_confluent=True,
+ run_in_apache=False
)),
("TestMultipleTopicToOneTableSnowpipeStreaming", EndToEndTestSuite(
- test_instance=TestMultipleTopicToOneTableSnowpipeStreaming(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
+ test_instance=TestMultipleTopicToOneTableSnowpipeStreaming(driver, nameSalt), clean=True,
+ run_in_confluent=True, run_in_apache=True
)),
("TestMultipleTopicToOneTableSnowpipe", EndToEndTestSuite(
- test_instance=TestMultipleTopicToOneTableSnowpipe(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
+ test_instance=TestMultipleTopicToOneTableSnowpipe(driver, nameSalt), clean=True, run_in_confluent=True,
+ run_in_apache=True
)),
("TestSchemaMapping", EndToEndTestSuite(
- test_instance=TestSchemaMapping(driver, nameSalt), clean=True, run_in_confluent=True,run_in_apache=True
+ test_instance=TestSchemaMapping(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
)),
("TestSnowpipeStreamingSchemaMappingDLQ", EndToEndTestSuite(
- test_instance=TestSnowpipeStreamingSchemaMappingDLQ(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
+ test_instance=TestSnowpipeStreamingSchemaMappingDLQ(driver, nameSalt), clean=True, run_in_confluent=True,
+ run_in_apache=True
)),
("TestAutoTableCreation", EndToEndTestSuite(
- test_instance=TestAutoTableCreation(driver, nameSalt, schemaRegistryAddress, testSet), clean=True, run_in_confluent=True, run_in_apache=False
+ test_instance=TestAutoTableCreation(driver, nameSalt, schemaRegistryAddress, testSet), clean=True,
+ run_in_confluent=True, run_in_apache=False
)),
("TestAutoTableCreationTopic2Table", EndToEndTestSuite(
- test_instance=TestAutoTableCreationTopic2Table(driver, nameSalt, schemaRegistryAddress, testSet), clean=True, run_in_confluent=True, run_in_apache=False
+ test_instance=TestAutoTableCreationTopic2Table(driver, nameSalt, schemaRegistryAddress, testSet),
+ clean=True, run_in_confluent=True, run_in_apache=False
)),
("TestSchemaEvolutionJson", EndToEndTestSuite(
- test_instance=TestSchemaEvolutionJson(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
+ test_instance=TestSchemaEvolutionJson(driver, nameSalt), clean=True, run_in_confluent=True,
+ run_in_apache=True
)),
("TestSchemaEvolutionAvroSR", EndToEndTestSuite(
- test_instance=TestSchemaEvolutionAvroSR(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=False
+ test_instance=TestSchemaEvolutionAvroSR(driver, nameSalt), clean=True, run_in_confluent=True,
+ run_in_apache=False
)),
("TestSchemaEvolutionWithAutoTableCreationJson", EndToEndTestSuite(
- test_instance=TestSchemaEvolutionWithAutoTableCreationJson(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
+ test_instance=TestSchemaEvolutionWithAutoTableCreationJson(driver, nameSalt), clean=True,
+ run_in_confluent=True, run_in_apache=True
)),
("TestSchemaEvolutionWithAutoTableCreationAvroSR", EndToEndTestSuite(
- test_instance=TestSchemaEvolutionWithAutoTableCreationAvroSR(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=False
+ test_instance=TestSchemaEvolutionWithAutoTableCreationAvroSR(driver, nameSalt), clean=True,
+ run_in_confluent=True, run_in_apache=False
)),
("TestSchemaEvolutionNonNullableJson", EndToEndTestSuite(
- test_instance=TestSchemaEvolutionNonNullableJson(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
+ test_instance=TestSchemaEvolutionNonNullableJson(driver, nameSalt), clean=True, run_in_confluent=True,
+ run_in_apache=True
)),
("TestSchemaNotSupportedConverter", EndToEndTestSuite(
- test_instance=TestSchemaNotSupportedConverter(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
+ test_instance=TestSchemaNotSupportedConverter(driver, nameSalt), clean=True, run_in_confluent=True,
+ run_in_apache=True
)),
("TestSchemaEvolutionDropTable", EndToEndTestSuite(
- test_instance=TestSchemaEvolutionDropTable(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
+ test_instance=TestSchemaEvolutionDropTable(driver, nameSalt), clean=True, run_in_confluent=True,
+ run_in_apache=True
)),
("TestKcDeleteCreate", EndToEndTestSuite(
test_instance=TestKcDeleteCreate(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
)),
("TestKcDeleteCreateChaos", EndToEndTestSuite(
- test_instance=TestKcDeleteCreateChaos(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
+ test_instance=TestKcDeleteCreateChaos(driver, nameSalt), clean=True, run_in_confluent=True,
+ run_in_apache=True
)),
("TestKcDeleteResume", EndToEndTestSuite(
test_instance=TestKcDeleteResume(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
)),
("TestKcDeleteResumeChaos", EndToEndTestSuite(
- test_instance=TestKcDeleteResumeChaos(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
+ test_instance=TestKcDeleteResumeChaos(driver, nameSalt), clean=True, run_in_confluent=True,
+ run_in_apache=True
)),
("TestKcPauseCreate", EndToEndTestSuite(
test_instance=TestKcPauseCreate(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
)),
("TestKcPauseCreateChaos", EndToEndTestSuite(
- test_instance=TestKcPauseCreateChaos(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
+ test_instance=TestKcPauseCreateChaos(driver, nameSalt), clean=True, run_in_confluent=True,
+ run_in_apache=True
)),
("TestKcPauseResume", EndToEndTestSuite(
test_instance=TestKcPauseResume(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
)),
("TestKcPauseResumeChaos", EndToEndTestSuite(
- test_instance=TestKcPauseResumeChaos(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
+ test_instance=TestKcPauseResumeChaos(driver, nameSalt), clean=True, run_in_confluent=True,
+ run_in_apache=True
)),
("TestKcRecreate", EndToEndTestSuite(
test_instance=TestKcRecreate(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
@@ -207,7 +221,9 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
("TestKcRestart", EndToEndTestSuite(
test_instance=TestKcRestart(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True
)),
+ ("TestSchemaEvolutionMultiTopicDropTable", EndToEndTestSuite(
+ test_instance=TestSchemaEvolutionMultiTopicDropTable(driver, nameSalt), clean=True, run_in_confluent=True,
+ run_in_apache=True
+ )),
])
return test_suites
-
-