Skip to content

Commit

Permalink
Merge branch 'master' into alhuang-oauth
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Aug 10, 2023
2 parents 0b02275 + e56154f commit 22be9a3
Show file tree
Hide file tree
Showing 16 changed files with 467 additions and 187 deletions.
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<groupId>com.snowflake</groupId>
<artifactId>snowflake-kafka-connector</artifactId>
<version>1.9.3</version>
<version>2.0.0</version>
<packaging>jar</packaging>
<name>Snowflake Kafka Connector</name>
<description>Snowflake Kafka Connect Sink Connector</description>
Expand Down Expand Up @@ -334,7 +334,7 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>2.0.1</version>
<version>2.0.2</version>
<exclusions>
<exclusion>
<groupId>net.snowflake</groupId>
Expand Down Expand Up @@ -369,12 +369,12 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.1</version>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.4.2</version>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
Expand Down
10 changes: 5 additions & 5 deletions pom_confluent.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<groupId>com.snowflake</groupId>
<artifactId>snowflake-kafka-connector</artifactId>
<version>1.9.3</version>
<version>2.0.0</version>
<packaging>jar</packaging>
<name>Snowflake Kafka Connector</name>
<description>Snowflake Kafka Connect Sink Connector</description>
Expand Down Expand Up @@ -386,7 +386,7 @@
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>2.0.1</version>
<version>2.0.2</version>
<exclusions>
<exclusion>
<groupId>net.snowflake</groupId>
Expand Down Expand Up @@ -420,13 +420,13 @@
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.1</version>
<artifactId>jackson-core</artifactId>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.4.2</version>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public class SnowflakeSinkConnectorConfig {
"enable.streaming.client.optimization";
public static final String ENABLE_STREAMING_CLIENT_OPTIMIZATION_DISPLAY =
"Enable streaming client optimization";
public static final boolean ENABLE_STREAMING_CLIENT_OPTIMIZATION_DEFAULT = false;
public static final boolean ENABLE_STREAMING_CLIENT_OPTIMIZATION_DEFAULT = true;
public static final String ENABLE_STREAMING_CLIENT_OPTIMIZATION_DOC =
"Whether to optimize the streaming client to reduce cost. Note that this may affect"
+ " throughput or latency and can only be set if Streaming Snowpipe is enabled";
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
public class Utils {

// Connector version, change every release
public static final String VERSION = "1.9.3";
public static final String VERSION = "2.0.0";

// connector parameter list
public static final String NAME = "name";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void startTask(final String tableName, final TopicPartition topicPartitio
String stageName = Utils.stageName(conn.getConnectorName(), tableName);
String nameIndex = getNameIndex(topicPartition.topic(), topicPartition.partition());
if (pipes.containsKey(nameIndex)) {
LOGGER.error("task is already registered, name: {}", nameIndex);
LOGGER.warn("task is already registered with {} partition", nameIndex);
} else {
String pipeName =
Utils.pipeName(conn.getConnectorName(), tableName, topicPartition.partition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ public long getOffset(TopicPartition topicPartition) {
String partitionChannelKey =
partitionChannelKey(topicPartition.topic(), topicPartition.partition());
if (partitionsToChannel.containsKey(partitionChannelKey)) {
return partitionsToChannel.get(partitionChannelKey).getOffsetSafeToCommitToKafka();
long offset = partitionsToChannel.get(partitionChannelKey).getOffsetSafeToCommitToKafka();
partitionsToChannel.get(partitionChannelKey).setLatestConsumerOffset(offset);
return offset;
} else {
LOGGER.warn(
"Topic: {} Partition: {} hasn't been initialized to get offset",
Expand Down Expand Up @@ -302,15 +304,14 @@ public void closeAll() {
/**
* This function is called during rebalance.
*
* <p>We don't close the channels. Upon rebalance, (inside {@link
* com.snowflake.kafka.connector.SnowflakeSinkTask#open(Collection)} we will reopen the channel
* anyways. [Check c'tor of {@link TopicPartitionChannel}]
* <p>All the channels are closed. The client is still active. Upon rebalance, (inside {@link
* com.snowflake.kafka.connector.SnowflakeSinkTask#open(Collection)} we will reopen the channel.
*
* <p>We will wipe the cache partitionsToChannel so that in {@link
* com.snowflake.kafka.connector.SnowflakeSinkTask#open(Collection)} we reinstantiate and fetch
* offsetToken
*
* @param partitions a list of topic partitions to close/shutdown
* @param partitions a list of topic partition
*/
@Override
public void close(Collection<TopicPartition> partitions) {
Expand All @@ -322,17 +323,21 @@ public void close(Collection<TopicPartition> partitions) {
partitionsToChannel.get(partitionChannelKey);
// Check for null since it's possible that the something goes wrong even before the
// channels are created
if (topicPartitionChannel != null) {
topicPartitionChannel.closeChannel();
}
LOGGER.info(
"Removing partitionChannel:{}, partition:{}, topic:{} from map(partitionsToChannel)",
"Closing partitionChannel:{}, partition:{}, topic:{}",
topicPartitionChannel == null ? null : topicPartitionChannel.getChannelName(),
topicPartition.topic(),
topicPartition.partition());
partitionsToChannel.remove(partitionChannelKey);
});
LOGGER.info(
"Closing {} partitions and Clearing partitionsToChannel Map of size:{}",
"Closing {} partitions and remaining partitions which are not closed are:{}, with size:{}",
partitions.size(),
partitionsToChannel.keySet().toString(),
partitionsToChannel.size());
partitionsToChannel.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,24 @@ public class TopicPartitionChannel {

// -------- private final fields -------- //

// This offset is updated when Snowflake has received offset from insertRows API
// We will update this value after calling offsetToken API for this channel
// This offset represents the data persisted in Snowflake. More specifically it is the Snowflake
// offset determined from the insertRows API call. It is set after calling the fetchOffsetToken
// API for this channel
private final AtomicLong offsetPersistedInSnowflake =
new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE);

// This offset is updated every time KC processed an offset, it's used to make sure that we won't
// process records that are already processed. It will be reset to the latest committed token
// every time we fetch it from Snowflake
// This offset represents the data buffered in KC. More specifically it is the KC offset to ensure
// exactly once functionality. On creation it is set to the latest committed token in Snowflake
// (see offsetPersistedInSnowflake) and updated on each new row from KC.
private final AtomicLong processedOffset =
new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE);

// This offset is a fallback to represent the data buffered in KC. It is similar to
// processedOffset, however it is only used to resend the offset when the channel offset token is
// NULL. It is updated to the first offset sent by KC (see processedOffset) or the offset
// persisted in Snowflake (see offsetPersistedInSnowflake)
private long latestConsumerOffset = NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;

/**
* Offsets are reset in kafka when one of following cases arises in which we rely on source of
* truth (Which is Snowflake's committed offsetToken)
Expand Down Expand Up @@ -263,6 +270,11 @@ public TopicPartitionChannel(
this.processedOffset.set(lastCommittedOffsetToken);
if (lastCommittedOffsetToken != NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
this.sinkTaskContext.offset(this.topicPartition, lastCommittedOffsetToken + 1L);
} else {
LOGGER.info(
"TopicPartitionChannel:{}, offset token is NULL, will rely on Kafka to send us the"
+ " correct offset instead",
this.getChannelName());
}
}

Expand All @@ -281,6 +293,11 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) {
final long currentOffsetPersistedInSnowflake = this.offsetPersistedInSnowflake.get();
final long currentProcessedOffset = this.processedOffset.get();

// Set the consumer offset to be the first record that Kafka sends us
if (latestConsumerOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
latestConsumerOffset = kafkaSinkRecord.kafkaOffset();
}

// Ignore adding to the buffer until we see the expected offset value
if (shouldIgnoreAddingRecordToBuffer(kafkaSinkRecord, currentProcessedOffset)) {
return;
Expand Down Expand Up @@ -345,7 +362,10 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) {
*/
private boolean shouldIgnoreAddingRecordToBuffer(
SinkRecord kafkaSinkRecord, long currentProcessedOffset) {
if (!isOffsetResetInKafka) {
// Don't skip rows if there is no offset reset or there is no offset token information in the
// channel
if (!isOffsetResetInKafka
|| currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
return false;
}

Expand Down Expand Up @@ -496,13 +516,8 @@ InsertRowsResponse insertBufferedRecords(StreamingBuffer streamingBufferToInsert
// Due to schema evolution, we may need to reopen the channel and reset the offset in kafka
// since it's possible that not all rows are ingested
if (response.needToResetOffset()) {
final long offsetRecoveredFromSnowflake =
streamingApiFallbackSupplier(
StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK);
// If there is no valid offset token at server side even after the reset, retry it again
if (offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
insertBufferedRecords(streamingBufferToInsert);
}
streamingApiFallbackSupplier(
StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK);
}
return response;
} catch (TopicPartitionChannelInsertionException ex) {
Expand Down Expand Up @@ -639,9 +654,6 @@ public InsertRowsResponse get() throws Throwable {
}
}
}
// TODO SNOW-758492: for schematization, sleep a few seconds after the insert to avoid
// combining channels with different schemas
Thread.sleep(2000);
}
return new InsertRowsResponse(finalResponse, needToResetOffset);
}
Expand Down Expand Up @@ -877,20 +889,25 @@ private long streamingApiFallbackSupplier(
private void resetChannelMetadataAfterRecovery(
final StreamingApiFallbackInvoker streamingApiFallbackInvoker,
final long offsetRecoveredFromSnowflake) {
// If we don't get a valid offset token from server side, reset the processed offset to invalid
// and rely on kafka to send us the correct data
if (offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
this.offsetPersistedInSnowflake.set(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE);
this.processedOffset.set(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE);
LOGGER.warn(
"{} Channel:{}, OffsetRecoveredFromSnowflake:{}, skip reset kafka offset",
LOGGER.info(
"{} Channel:{}, offset token is NULL, will use the consumer offset managed by the"
+ " connector instead, consumer offset:{}",
streamingApiFallbackInvoker,
this.getChannelName(),
offsetRecoveredFromSnowflake);
latestConsumerOffset);
}

// If the offset token in the channel is null, use the consumer offset managed by the connector;
// otherwise use the offset token stored in the channel
final long offsetToResetInKafka =
offsetRecoveredFromSnowflake == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE
? latestConsumerOffset
: offsetRecoveredFromSnowflake + 1L;
if (offsetToResetInKafka == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
return;
}

final long offsetToResetInKafka = offsetRecoveredFromSnowflake + 1L;
// reset the buffer
this.bufferLock.lock();
try {
Expand All @@ -916,7 +933,7 @@ private void resetChannelMetadataAfterRecovery(
this.bufferLock.unlock();
}
LOGGER.warn(
"{} Channel:{}, OffsetRecoveredFromSnowflake:{}, Reset kafka offset to:{}",
"{} Channel:{}, OffsetRecoveredFromSnowflake:{}, reset kafka offset to:{}",
streamingApiFallbackInvoker,
this.getChannelName(),
offsetRecoveredFromSnowflake,
Expand Down Expand Up @@ -954,26 +971,15 @@ private long getRecoveredOffsetFromSnowflake(
* snowflake.
*/
private long fetchLatestCommittedOffsetFromSnowflake() {
LOGGER.debug(
"Fetching last committed offset for partition channel:{}",
this.channel.getFullyQualifiedName());
LOGGER.debug("Fetching last committed offset for partition channel:{}", this.getChannelName());
String offsetToken = null;
try {
offsetToken = this.channel.getLatestCommittedOffsetToken();
if (offsetToken == null) {
LOGGER.warn(
"OffsetToken not present for channelName:{}, will rely on kafka consumer offset as"
+ " source of truth",
this.getChannelName());
return NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;
} else {
long latestCommittedOffsetInSnowflake = Long.parseLong(offsetToken);
LOGGER.info(
"Fetched offsetToken:{} for channelName:{}",
latestCommittedOffsetInSnowflake,
this.channel.getFullyQualifiedName());
return latestCommittedOffsetInSnowflake;
}
LOGGER.info(
"Fetched offsetToken for channelName:{}, offset:{}", this.getChannelName(), offsetToken);
return offsetToken == null
? NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE
: Long.parseLong(offsetToken);
} catch (NumberFormatException ex) {
LOGGER.error(
"The offsetToken string does not contain a parsable long:{} for channel:{}",
Expand Down Expand Up @@ -1075,6 +1081,12 @@ protected SnowflakeTelemetryService getTelemetryServiceV2() {
return this.telemetryServiceV2;
}

protected void setLatestConsumerOffset(long consumerOffset) {
if (consumerOffset > this.latestConsumerOffset) {
this.latestConsumerOffset = consumerOffset;
}
}

/**
* Converts the original kafka sink record into a Json Record. i.e key and values are converted
* into Json so that it can be used to insert into variant column of Snowflake Table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,17 @@
public abstract class SnowflakeTelemetryBasicInfo {
final String tableName;

/**
* User created or user visible stage name.
*
* <p>It is null for Snowpipe Streaming Telemetry.
*/
final String stageName;

static final KCLogger LOGGER = new KCLogger(SnowflakeTelemetryBasicInfo.class.getName());

/**
* Base Constructor. Accepts a tableName and StageName.
*
* @param tableName Checks for Nullability
* @param stageName Can be null (In case of Snowpipe Streaming since there is no user visible
* Snowflake Stage)
*/
public SnowflakeTelemetryBasicInfo(final String tableName, final String stageName) {
public SnowflakeTelemetryBasicInfo(final String tableName) {
Preconditions.checkArgument(
!Strings.isNullOrEmpty(tableName), "tableName cannot be null or empty");
this.tableName = tableName;
this.stageName = stageName;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ public class SnowflakeTelemetryPipeCreation extends SnowflakeTelemetryBasicInfo
int fileCountReprocessPurge =
0; // files on stage that are purged due to reprocessing when cleaner starts
long startTime; // start time of the pipe
private final String stageName;
private final String pipeName;

public SnowflakeTelemetryPipeCreation(
final String tableName, final String stageName, final String pipeName) {
super(tableName, stageName);
super(tableName);
this.stageName = stageName;
this.pipeName = pipeName;
this.startTime = System.currentTimeMillis();
}
Expand Down
Loading

0 comments on commit 22be9a3

Please sign in to comment.