Skip to content

Commit 00901f1

Browse files
FLOW-7792 React to error deltas instead of error absolute value (#1240)
1 parent d035b1e commit 00901f1

File tree

7 files changed

+156
-18
lines changed

7 files changed

+156
-18
lines changed

src/main/java/com/snowflake/kafka/connector/internal/streaming/v2/SnowpipeStreamingPartitionChannel.java

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ public class SnowpipeStreamingPartitionChannel implements TopicPartitionChannel
6565

6666
private long lastAppendRowsOffset = NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;
6767

68+
// Tracks the initial error count when the channel was opened.
69+
// Used to detect NEW errors (current error count > initial error count) since error counts
70+
// are cumulative and don't reset when a channel is reopened.
71+
private long initialErrorCount = 0;
72+
6873
// Indicates whether we need to skip and discard any leftover rows in the current batch, this
6974
// could happen when the channel gets invalidated and reset
7075
private boolean needToSkipCurrentBatch = false;
@@ -490,23 +495,28 @@ private long fetchLatestOffsetFromChannel(SnowflakeStreamingIngestChannel channe
490495
*
491496
* @return new channel which was fetched after open/reopen
492497
*/
493-
private SnowflakeStreamingIngestChannel openChannelForTable(String channelName) {
494-
SnowflakeStreamingIngestClient streamingIngestClient =
498+
private SnowflakeStreamingIngestChannel openChannelForTable(final String channelName) {
499+
final SnowflakeStreamingIngestClient streamingIngestClient =
495500
StreamingClientManager.getClient(
496501
connectorName, taskId, pipeName, connectorConfig, streamingClientProperties);
497-
OpenChannelResult result = streamingIngestClient.openChannel(channelName, null);
498-
if (result.getChannelStatus().getStatusCode().equals("SUCCESS")) {
499-
LOGGER.info("Successfully opened streaming channel: {}", channelName);
502+
final OpenChannelResult result = streamingIngestClient.openChannel(channelName, null);
503+
final ChannelStatus channelStatus = result.getChannelStatus();
504+
if (channelStatus.getStatusCode().equals("SUCCESS")) {
505+
// Capture the initial error count - errors are cumulative and don't reset on channel reopen.
506+
// We only want to fail on NEW errors that occur after the channel was opened.
507+
this.initialErrorCount = channelStatus.getRowsErrorCount();
508+
LOGGER.info(
509+
"Successfully opened streaming channel: {}, initialErrorCount: {}",
510+
channelName,
511+
this.initialErrorCount);
500512
return result.getChannel();
501513
} else {
502514
LOGGER.error(
503-
"Failed to open channel: {}, error code: {}",
504-
channelName,
505-
result.getChannelStatus().getStatusCode());
515+
"Failed to open channel: {}, error code: {}", channelName, channelStatus.getStatusCode());
506516
throw ERROR_5028.getException(
507517
String.format(
508518
"Failed to open channel %s. Error code %s",
509-
channelName, result.getChannelStatus().getStatusCode()));
519+
channelName, channelStatus.getStatusCode()));
510520
}
511521
}
512522

@@ -633,24 +643,36 @@ private void logChannelStatus(final ChannelStatus status) {
633643
}
634644

635645
private void handleChannelErrors(final ChannelStatus status, final boolean tolerateErrors) {
636-
final long rowsErrorCount = status.getRowsErrorCount();
637-
if (rowsErrorCount > 0) {
646+
final long currentErrorCount = status.getRowsErrorCount();
647+
// Error counts are cumulative and don't reset when a channel is reopened.
648+
// Only fail if there are NEW errors that occurred after the channel was opened.
649+
final long newErrorCount = currentErrorCount - this.initialErrorCount;
650+
651+
if (newErrorCount > 0) {
638652
final String errorMessage =
639653
String.format(
640-
"Channel [%s] has %d errors. Last error message: %s, last error timestamp: %s,"
641-
+ " last error offset token upper bound: %s",
654+
"Channel [%s] has %d new errors (total: %d, initial: %d). Last error message: %s,"
655+
+ " last error timestamp: %s, last error offset token upper bound: %s",
642656
this.getChannelNameFormatV1(),
643-
rowsErrorCount,
657+
newErrorCount,
658+
currentErrorCount,
659+
this.initialErrorCount,
644660
status.getLastErrorMessage(),
645661
status.getLastErrorTimestamp(),
646662
status.getLastErrorOffsetTokenUpperBound());
647663

664+
this.initialErrorCount = currentErrorCount;
648665
if (tolerateErrors) {
649666
LOGGER.warn(errorMessage);
650667
} else {
651668
this.telemetryService.reportKafkaConnectFatalError(errorMessage);
652669
throw ERROR_5030.getException(errorMessage);
653670
}
671+
} else if (currentErrorCount > 0) {
672+
LOGGER.debug(
673+
"Channel [{}] has {} pre-existing errors from before connector startup (no new errors)",
674+
this.getChannelNameFormatV1(),
675+
currentErrorCount);
654676
}
655677
}
656678

src/test/java/com/snowflake/kafka/connector/internal/streaming/ChannelStatusCheckIT.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,71 @@ void shouldContinueWorkingWhenChannelHasErrorsAndToleranceIsAll() throws JsonPro
174174
"All tasks should remain running when errors.tolerance=all");
175175
}
176176

177+
@Test
178+
void shouldContinueWorkingWithPreExistingErrorsAndToleranceIsNone()
179+
throws JsonProcessingException {
180+
// Given: Pre-existing errors are set BEFORE the connector starts (simulating channel reopen
181+
// scenario)
182+
// This simulates the case where a channel has cumulative errors from a previous connector run
183+
fakeClientSupplier.setPreExistingErrorCount(5);
184+
185+
Map<String, String> config = defaultProperties(topicName, connectorName);
186+
config.put(KafkaConnectorConfigParams.ERRORS_TOLERANCE_CONFIG, "none");
187+
connectCluster.configureConnector(connectorName, config);
188+
waitForConnectorRunning(connectorName);
189+
190+
FakeSnowflakeStreamingIngestClient fakeClient = waitForConnectorToOpenChannels(connectorName);
191+
192+
// Produce messages
193+
produceMessages(5);
194+
195+
// Then: connector should remain running because pre-existing errors don't count as new errors
196+
await("Messages processed despite pre-existing errors")
197+
.atMost(Duration.ofSeconds(30))
198+
.until(() -> fakeClient.getAppendedRowCount() >= 5);
199+
200+
ConnectorStateInfo connectorState = connectCluster.connectorStatus(connectorName);
201+
assertTrue(
202+
connectorState.tasks().stream().allMatch(task -> "RUNNING".equals(task.state())),
203+
"All tasks should be running when there are only pre-existing errors");
204+
}
205+
206+
@Test
207+
void shouldFailWhenNewErrorsOccurAfterStartupWithPreExistingErrors()
208+
throws JsonProcessingException {
209+
// Given: Pre-existing errors are set BEFORE the connector starts
210+
fakeClientSupplier.setPreExistingErrorCount(5);
211+
212+
Map<String, String> config = defaultProperties(topicName, connectorName);
213+
connectCluster.configureConnector(connectorName, config);
214+
waitForConnectorRunning(connectorName);
215+
216+
FakeSnowflakeStreamingIngestClient fakeClient = waitForConnectorToOpenChannels(connectorName);
217+
218+
// Produce initial message
219+
produceMessages(1);
220+
await("Initial message processed")
221+
.atMost(Duration.ofSeconds(30))
222+
.until(() -> fakeClient.getAppendedRowCount() >= 1);
223+
224+
// When: NEW errors occur (error count increases from 5 to 10)
225+
for (FakeSnowflakeStreamingIngestChannel channel : fakeClient.getOpenedChannels()) {
226+
ChannelStatus statusWithNewErrors =
227+
createChannelStatusWithErrors(channel.getChannelName(), 10);
228+
channel.setChannelStatus(statusWithNewErrors);
229+
}
230+
231+
// Then: connector task should fail due to NEW channel errors
232+
await("Connector task failed due to new errors")
233+
.atMost(Duration.ofMinutes(2))
234+
.pollInterval(Duration.ofSeconds(4))
235+
.until(
236+
() -> {
237+
ConnectorStateInfo state = connectCluster.connectorStatus(connectorName);
238+
return state.tasks().stream().anyMatch(task -> "FAILED".equals(task.state()));
239+
});
240+
}
241+
177242
private void produceMessages(int count) throws JsonProcessingException {
178243
Map<String, String> payload = Map.of("key1", "value1", "key2", "value2");
179244
for (int i = 0; i < count; i++) {

src/test/java/com/snowflake/kafka/connector/internal/streaming/FakeIngestClientSupplier.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ public class FakeIngestClientSupplier implements IngestClientSupplier {
1111
private final ConcurrentHashMap<String, FakeSnowflakeStreamingIngestClient>
1212
pipeToIngestClientMap = new ConcurrentHashMap<>();
1313

14+
private long preExistingErrorCount = 0;
15+
1416
@Override
1517
public SnowflakeStreamingIngestClient get(
1618
final String clientName,
@@ -21,10 +23,27 @@ public SnowflakeStreamingIngestClient get(
2123
final StreamingClientProperties streamingClientProperties) {
2224
return pipeToIngestClientMap.computeIfAbsent(
2325
pipeName,
24-
(key) -> new FakeSnowflakeStreamingIngestClient(pipeName, connectorConfig.get("name")));
26+
(key) -> {
27+
final FakeSnowflakeStreamingIngestClient client =
28+
new FakeSnowflakeStreamingIngestClient(pipeName, connectorConfig.get("name"));
29+
client.setDefaultErrorCount(preExistingErrorCount);
30+
return client;
31+
});
2532
}
2633

2734
public Collection<FakeSnowflakeStreamingIngestClient> getFakeIngestClients() {
2835
return pipeToIngestClientMap.values();
2936
}
37+
38+
/**
39+
* Sets the pre-existing error count that will be applied to all channels when they are opened.
40+
* This simulates the cumulative error count that persists in Snowflake across connector restarts.
41+
*/
42+
public void setPreExistingErrorCount(final long errorCount) {
43+
this.preExistingErrorCount = errorCount;
44+
// Also update existing clients
45+
for (final FakeSnowflakeStreamingIngestClient client : pipeToIngestClientMap.values()) {
46+
client.setDefaultErrorCount(errorCount);
47+
}
48+
}
3049
}

src/test/java/com/snowflake/kafka/connector/internal/streaming/FakeSnowflakeStreamingIngestChannel.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,21 @@ public class FakeSnowflakeStreamingIngestChannel
1919
private final String channelName;
2020
/** Collection of all rows appended to this channel */
2121
private final List<Map<String, Object>> appendedRows;
22+
/** Reference to parent client for sharing error counts across channel reopens */
23+
private final FakeSnowflakeStreamingIngestClient parentClient;
2224

2325
private volatile boolean closed;
2426
private String offsetToken;
2527
private ChannelStatus channelStatus;
2628

27-
public FakeSnowflakeStreamingIngestChannel(String pipeName, String channelName) {
29+
public FakeSnowflakeStreamingIngestChannel(
30+
final String pipeName,
31+
final String channelName,
32+
final FakeSnowflakeStreamingIngestClient parentClient) {
2833
this.pipeName = pipeName;
2934
this.channelName = channelName;
3035
this.appendedRows = new ArrayList<>();
36+
this.parentClient = parentClient;
3137
}
3238

3339
@Override
@@ -109,6 +115,10 @@ public ChannelStatus getChannelStatus() {
109115

110116
public void setChannelStatus(final ChannelStatus channelStatus) {
111117
this.channelStatus = channelStatus;
118+
// Update the shared error count in the parent client so it persists across channel reopens
119+
if (parentClient != null && channelStatus.getRowsErrorCount() > 0) {
120+
parentClient.setInitialErrorCountForChannel(channelName, channelStatus.getRowsErrorCount());
121+
}
112122
}
113123

114124
@Override

src/test/java/com/snowflake/kafka/connector/internal/streaming/FakeSnowflakeStreamingIngestClient.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,29 @@ public class FakeSnowflakeStreamingIngestClient implements SnowflakeStreamingIng
2020
private final List<FakeSnowflakeStreamingIngestChannel> openedChannels =
2121
Collections.synchronizedList(new ArrayList<>());
2222
private final Map<String, String> channelNameToOffsetTokens = new ConcurrentHashMap<>();
23+
// Shared error counts per channel name - persists across channel reopens like real Snowflake
24+
private final Map<String, Long> channelNameToErrorCount = new ConcurrentHashMap<>();
25+
// Default error count to use when no channel-specific count is set
26+
private long defaultErrorCount = 0;
2327
private boolean closed = false;
2428

2529
public FakeSnowflakeStreamingIngestClient(final String pipeName, final String connectorName) {
2630
this.pipeName = pipeName;
2731
this.connectorName = connectorName;
2832
}
2933

34+
public void setDefaultErrorCount(final long errorCount) {
35+
this.defaultErrorCount = errorCount;
36+
}
37+
38+
public void setInitialErrorCountForChannel(final String channelName, final long errorCount) {
39+
channelNameToErrorCount.put(channelName, errorCount);
40+
}
41+
42+
public long getErrorCountForChannel(final String channelName) {
43+
return channelNameToErrorCount.getOrDefault(channelName, defaultErrorCount);
44+
}
45+
3046
@Override
3147
public void close() {
3248
this.closed = true;
@@ -52,6 +68,9 @@ public OpenChannelResult openChannel(final String channelName, final String offs
5268
if (offsetToken != null) {
5369
channelNameToOffsetTokens.put(channelName, offsetToken);
5470
}
71+
// Use the shared error count - this persists across channel reopens like real Snowflake
72+
// Falls back to defaultErrorCount if no channel-specific count is set
73+
final long errorCount = channelNameToErrorCount.getOrDefault(channelName, defaultErrorCount);
5574
final ChannelStatus channelStatus =
5675
new ChannelStatus(
5776
"db",
@@ -63,14 +82,14 @@ public OpenChannelResult openChannel(final String channelName, final String offs
6382
Instant.now(),
6483
0, // rowsInsertedCount
6584
0, // rowsParsedCount
66-
0, // rowsErrorCount - default to 0 (no errors)
85+
errorCount, // rowsErrorCount - use shared error count
6786
null,
6887
null,
6988
null,
7089
null,
7190
Instant.now());
7291
final FakeSnowflakeStreamingIngestChannel channel =
73-
new FakeSnowflakeStreamingIngestChannel(pipeName, channelName);
92+
new FakeSnowflakeStreamingIngestChannel(pipeName, channelName, this);
7493
channel.setChannelStatus(channelStatus); // Set default channel status
7594
openedChannels.add(channel);
7695
return new OpenChannelResult(channel, channelStatus);

src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIntoVariantIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
import java.util.Collections;
1313
import java.util.List;
1414
import java.util.stream.Collectors;
15+
import org.junit.jupiter.api.Disabled;
1516
import org.junit.jupiter.api.Test;
1617

18+
@Disabled("Regresion introduced on production FLOW-7864")
1719
public class IcebergIngestionIntoVariantIT extends IcebergIngestionIT {
1820

1921
@Test

src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergInitServiceIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.snowflake.kafka.connector.internal.TestUtils;
99
import org.junit.jupiter.api.*;
1010

11+
@Disabled("Regresion introduced on production FLOW-7864")
1112
public class IcebergInitServiceIT extends BaseIcebergIT {
1213

1314
private static IcebergInitService icebergInitService;

0 commit comments

Comments
 (0)