Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -540,30 +540,38 @@ InsertRowsResponse insertBufferedRecords(StreamingBuffer streamingBufferToInsert
this.previousFlushTimeStampMs = System.currentTimeMillis();
return null;
}

InsertRowsResponse response = null;
try {
response = insertRowsWithFallback(streamingBufferToInsert);
// Updates the flush time (last time we called insertRows API)
this.previousFlushTimeStampMs = System.currentTimeMillis();

LOGGER.info(
"Successfully called insertRows for channel:{}, buffer:{}, insertResponseHasErrors:{},"
+ " needToResetOffset:{}",
this.getChannelNameFormatV1(),
streamingBufferToInsert,
response.hasErrors(),
response.needToResetOffset());
if (response.hasErrors()) {
handleInsertRowsFailures(
response.getInsertErrors(), streamingBufferToInsert.getSinkRecords());
}

// Due to schema evolution, we may need to reopen the channel and reset the offset in kafka
// since it's possible that not all rows are ingested
if (response.needToResetOffset()) {
streamingApiFallbackSupplier(
StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK);
return response;
}

// If there are errors other than schema mismatch, we need to handle them and reinsert the
// good rows
if (response.hasErrors()) {
handleInsertRowsFailures(
response.getInsertErrors(), streamingBufferToInsert.getSinkRecords());
insertBufferedRecords(
rebuildBufferWithoutErrorRows(streamingBufferToInsert, response.getInsertErrors()));
}

// Updates the flush time (last time we successfully insert some rows)
this.previousFlushTimeStampMs = System.currentTimeMillis();

return response;
} catch (TopicPartitionChannelInsertionException ex) {
// Suppressing the exception because other channels might still continue to ingest
Expand All @@ -576,6 +584,22 @@ InsertRowsResponse insertBufferedRecords(StreamingBuffer streamingBufferToInsert
return response;
}

/** Building a new buffer which contains only the good rows from the original buffer */
private StreamingBuffer rebuildBufferWithoutErrorRows(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a test coverage for this new method?

StreamingBuffer streamingBufferToInsert,
List<InsertValidationResponse.InsertError> insertErrors) {
StreamingBuffer buffer = new StreamingBuffer();
int errorIdx = 0;
for (long rowIdx = 0; rowIdx < streamingBufferToInsert.getNumOfRecords(); rowIdx++) {
if (errorIdx < insertErrors.size() && rowIdx == insertErrors.get(errorIdx).getRowIndex()) {
errorIdx++;
} else {
buffer.insert(streamingBufferToInsert.getSinkRecord(rowIdx));
}
}
return buffer;
}

Comment on lines +588 to +602
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this result into a validation error you added recently?

/**
* Uses {@link Fallback} API to reopen the channel if insertRows throws {@link SFException}.
*
Expand Down Expand Up @@ -657,52 +681,28 @@ public InsertRowsResponse get() throws Throwable {
Pair<List<Map<String, Object>>, List<Long>> recordsAndOffsets =
this.insertRowsStreamingBuffer.getData();
List<Map<String, Object>> records = recordsAndOffsets.getKey();
List<Long> offsets = recordsAndOffsets.getValue();
InsertValidationResponse finalResponse = new InsertValidationResponse();
boolean needToResetOffset = false;
if (!enableSchemaEvolution) {
finalResponse =
this.channel.insertRows(
records,
Long.toString(this.insertRowsStreamingBuffer.getFirstOffset()),
Long.toString(this.insertRowsStreamingBuffer.getLastOffset()));
} else {
for (int idx = 0; idx < records.size(); idx++) {
// For schema evolution, we need to call the insertRows API row by row in order to
// preserve the original order, for anything after the first schema mismatch error we will
// retry after the evolution
InsertValidationResponse response =
this.channel.insertRow(records.get(idx), Long.toString(offsets.get(idx)));
if (response.hasErrors()) {
InsertValidationResponse.InsertError insertError = response.getInsertErrors().get(0);
List<String> extraColNames = insertError.getExtraColNames();
List<String> nonNullableColumns = insertError.getMissingNotNullColNames();
long originalSinkRecordIdx =
offsets.get(idx) - this.insertRowsStreamingBuffer.getFirstOffset();
if (extraColNames == null && nonNullableColumns == null) {
InsertValidationResponse.InsertError newInsertError =
new InsertValidationResponse.InsertError(
insertError.getRowContent(), originalSinkRecordIdx);
newInsertError.setException(insertError.getException());
newInsertError.setExtraColNames(insertError.getExtraColNames());
newInsertError.setMissingNotNullColNames(insertError.getMissingNotNullColNames());
// Simply added to the final response if it's not schema related errors
finalResponse.addError(insertError);
Comment on lines -682 to -690
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sfc-gh-tzhang why you removed this part? IMO we still need it to properly add the rows to the rebuilt buffer and also send only the non-schema errors into DLQ

} else {
SchematizationUtils.evolveSchemaIfNeeded(
this.conn,
this.channel.getTableName(),
nonNullableColumns,
extraColNames,
this.insertRowsStreamingBuffer.getSinkRecord(originalSinkRecordIdx));
// Offset reset needed since it's possible that we successfully ingested partial batch
needToResetOffset = true;
break;
}
InsertValidationResponse response =
this.channel.insertRows(
records,
Long.toString(this.insertRowsStreamingBuffer.getFirstOffset()),
Long.toString(this.insertRowsStreamingBuffer.getLastOffset()));
if (enableSchemaEvolution) {
for (InsertValidationResponse.InsertError insertError : response.getInsertErrors()) {
List<String> extraColNames = insertError.getExtraColNames();
List<String> nonNullableColumns = insertError.getMissingNotNullColNames();
if (extraColNames != null || nonNullableColumns != null) {
SchematizationUtils.evolveSchemaIfNeeded(
this.conn,
this.channel.getTableName(),
nonNullableColumns,
extraColNames,
this.insertRowsStreamingBuffer.getSinkRecord(insertError.getRowIndex()));
needToResetOffset = true;
}
}
}
return new InsertRowsResponse(finalResponse, needToResetOffset);
return new InsertRowsResponse(response, needToResetOffset);
}
}

Expand Down Expand Up @@ -1059,7 +1059,7 @@ private SnowflakeStreamingIngestChannel openChannelForTable() {
.setDBName(this.sfConnectorConfig.get(Utils.SF_DATABASE))
.setSchemaName(this.sfConnectorConfig.get(Utils.SF_SCHEMA))
.setTableName(this.tableName)
.setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE)
.setOnErrorOption(OpenChannelRequest.OnErrorOption.SKIP_BATCH)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be a BCR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, there is no behavior difference as far as customer is concerned

.setOffsetTokenVerificationFunction(StreamingUtils.offsetTokenVerificationFunction)
.build();
LOGGER.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,11 +626,11 @@ public void testInsertRowsWithSchemaEvolution() throws Exception {
validationResponse2.addError(insertError2);

Mockito.when(
mockStreamingChannel.insertRow(
ArgumentMatchers.any(), ArgumentMatchers.any(String.class)))
.thenReturn(new InsertValidationResponse())
mockStreamingChannel.insertRows(
ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any()))
.thenReturn(validationResponse2)
.thenReturn(validationResponse1)
.thenReturn(validationResponse2);
.thenReturn(new InsertValidationResponse());

Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()).thenReturn("0");

Expand Down Expand Up @@ -686,6 +686,23 @@ public void testInsertRowsWithSchemaEvolution() throws Exception {

topicPartitionChannel.insertBufferedRecordsIfFlushTimeThresholdReached();

// Verify that the buffer is cleaned up and nothing is in DLQ because of schematization error
Assert.assertTrue(topicPartitionChannel.isPartitionBufferEmpty());
Assert.assertEquals(0, kafkaRecordErrorReporter.getReportedRecords().size());

// Do it again without any schematization error, and we should have row in DLQ
Copy link
Collaborator

@sfc-gh-japatel sfc-gh-japatel Mar 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am confused about the comment, do you mean not have a row in DLQ if there is no schematization error?

or do you mean to verify the DLQ results something along this lines? Assert.assertEquals(>1, kafkaRecordErrorReporter.getReportedRecords().size());?

for (int idx = 0; idx < records.size(); idx++) {
topicPartitionChannel.insertRecordToBuffer(records.get(idx), idx == 0);
}

// In an ideal world, put API is going to invoke this to check if flush time threshold has
// reached.
// We are mimicking that call.
// Will wait for 10 seconds.
Thread.sleep(bufferFlushTimeSeconds * 1000 + 10);

topicPartitionChannel.insertBufferedRecordsIfFlushTimeThresholdReached();

// Verify that the buffer is cleaned up and one record is in the DLQ
Assert.assertTrue(topicPartitionChannel.isPartitionBufferEmpty());
Assert.assertEquals(1, kafkaRecordErrorReporter.getReportedRecords().size());
Expand Down