Skip to content

Commit 89b49a5

Browse files
CONN-10511 Automatic failover for Snowpipe Streaming (#1238)
Co-authored-by: Lukasz Kucharski <[email protected]>
1 parent 0a98899 commit 89b49a5

File tree

5 files changed

+85
-7
lines changed

5 files changed

+85
-7
lines changed

src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,4 +349,12 @@ ChannelMigrateOffsetTokenResponseDTO migrateStreamingChannelOffsetToken(
349349
* otherwise.
350350
*/
351351
Optional<List<DescribeTableRow>> describeTable(String tableName);
352+
353+
/**
354+
* Checks if the underlying JDBC connection is valid.
355+
*
356+
* @param timeoutSeconds the timeout in seconds to wait for the validation operation to complete
357+
* @return true if the connection is valid, false otherwise
358+
*/
359+
boolean isValid(int timeoutSeconds);
352360
}

src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1221,6 +1221,16 @@ protected ChannelMigrateOffsetTokenResponseDTO getChannelMigrateOffsetTokenRespo
12211221
return channelMigrateOffsetTokenResponseDTO;
12221222
}
12231223

1224+
@Override
1225+
public boolean isValid(int timeoutSeconds) {
1226+
try {
1227+
return this.conn != null && this.conn.isValid(timeoutSeconds);
1228+
} catch (SQLException e) {
1229+
LOGGER.warn("Error checking connection validity: {}", e.getMessage());
1230+
return false;
1231+
}
1232+
}
1233+
12241234
public static class FormattingUtils {
12251235
/**
12261236
* Transform the objectName to uppercase unless it is enclosed in double quotes

src/main/java/com/snowflake/kafka/connector/internal/streaming/OpenChannelRetryPolicy.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,21 @@
1313
* backoff and jitter.
1414
*
1515
* <p>This class provides a clean interface to execute channel opening operations with automatic
16-
* retry on HTTP 429 (rate limiting) errors from Snowflake streaming service.
16+
* retry on:
17+
*
18+
* <ul>
19+
* <li>HTTP 429 (rate limiting) errors from Snowflake streaming service
20+
* <li>Status code 47 (secondary database) errors during failover scenarios
21+
* </ul>
1722
*/
1823
class OpenChannelRetryPolicy {
1924

2025
private static final KCLogger LOGGER = new KCLogger(OpenChannelRetryPolicy.class.getName());
2126

2227
private static final String RATE_LIMIT_MESSAGE_PART = "HTTP Status: 429";
28+
private static final String SECONDARY_DATABASE_MESSAGE_PART =
29+
"Cannot ingest data into a table that is part of a secondary database";
30+
private static final String SECONDARY_DATABASE_STATUS_CODE = "\"status_code\" : 47";
2331

2432
// Retry policy constants
2533
/** Initial delay before the first retry attempt. */
@@ -37,8 +45,14 @@ class OpenChannelRetryPolicy {
3745
/**
3846
* Executes the provided channel opening action with retry handling.
3947
*
40-
* <p>On SFException containing "429" (HTTP rate limiting), it will retry with exponential backoff
41-
* and jitter with unlimited retry attempts. Other exceptions are not retried.
48+
* <p>Retries with exponential backoff and jitter with unlimited retry attempts on:
49+
*
50+
* <ul>
51+
* <li>SFException containing "HTTP Status: 429" (rate limiting)
52+
* <li>SFException containing status code 47 (secondary database during failover)
53+
* </ul>
54+
*
55+
* Other exceptions are not retried.
4256
*
4357
* @param channelOpenAction the action to execute (typically openChannelForTable call)
4458
* @param channelName the channel name for logging purposes
@@ -66,6 +80,20 @@ static SnowflakeStreamingIngestChannel executeWithRetry(
6680
}
6781

6882
private static boolean isRetryableError(Throwable e) {
69-
return e instanceof SFException && e.getMessage().contains(RATE_LIMIT_MESSAGE_PART);
83+
if (!(e instanceof SFException)) {
84+
return false;
85+
}
86+
87+
String message = e.getMessage();
88+
89+
// Retry on HTTP 429 (rate limiting)
90+
boolean isRateLimitingError = message.contains(RATE_LIMIT_MESSAGE_PART);
91+
// Retry on status code 47 (secondary database during failover)
92+
//
93+
boolean isFailoverError =
94+
message.contains(SECONDARY_DATABASE_STATUS_CODE)
95+
|| message.contains(SECONDARY_DATABASE_MESSAGE_PART);
96+
97+
return isRateLimitingError || isFailoverError;
7098
}
7199
}

src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.snowflake.kafka.connector.dlq.KafkaRecordErrorReporter;
1515
import com.snowflake.kafka.connector.internal.KCLogger;
1616
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
17+
import com.snowflake.kafka.connector.internal.SnowflakeConnectionServiceFactory;
1718
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
1819
import com.snowflake.kafka.connector.internal.SnowflakeSinkService;
1920
import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter;
@@ -60,7 +61,7 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
6061
private static final KCLogger LOGGER = new KCLogger(SnowflakeSinkServiceV2.class.getName());
6162

6263
// Used to connect to Snowflake, could be null during testing
63-
private final SnowflakeConnectionService conn;
64+
private volatile SnowflakeConnectionService conn;
6465

6566
private final RecordService recordService;
6667
private final SnowflakeTelemetryService telemetryService;
@@ -350,6 +351,11 @@ public void insert(SinkRecord record) {
350351
"Topic: {} Partition: {} hasn't been initialized by OPEN function",
351352
record.topic(),
352353
record.kafkaPartition());
354+
355+
// Check connection validity and recreate if needed before starting partition
356+
// Needed to handle failover scenario
357+
recreateInvalidConnection();
358+
353359
startPartition(
354360
Utils.tableName(record.topic(), this.topicToTableMap),
355361
new TopicPartition(record.topic(), record.kafkaPartition()));
@@ -714,4 +720,30 @@ private void populateSchemaEvolutionPermissions(String tableName) {
714720
}
715721
}
716722
}
723+
724+
private void recreateInvalidConnection() {
725+
try {
726+
// Check if connection is null, closed, or invalid
727+
boolean shouldRecreate = false;
728+
if (conn == null || conn.isClosed()) {
729+
shouldRecreate = true;
730+
} else if (!conn.isValid(5)) {
731+
shouldRecreate = true;
732+
try {
733+
conn.close();
734+
} catch (Exception e) {
735+
LOGGER.warn("Could not close the old connection before opening the new one.", e);
736+
}
737+
}
738+
if (shouldRecreate) {
739+
LOGGER.warn("Connection is invalid, attempting to recreate");
740+
this.conn =
741+
SnowflakeConnectionServiceFactory.builder().setProperties(connectorConfig).build();
742+
743+
LOGGER.info("Successfully recreated Snowflake connection");
744+
}
745+
} catch (Exception e) {
746+
LOGGER.error("Failed to recreate connection: {}", e.getMessage());
747+
}
748+
}
717749
}

test/connect-log4j.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,5 @@ log4j.logger.org.apache.zookeeper=ERROR
4343
log4j.logger.org.reflections=ERROR
4444

4545
# enable debug level logging for SF KC and Ingest SDK
46-
log4j.logger.com.snowflake.kafka.connector=DEBUG
47-
log4j.logger.net.snowflake=DEBUG
46+
log4j.logger.com.snowflake.kafka.connector=INFO
47+
log4j.logger.net.snowflake=INFO

0 commit comments

Comments
 (0)