22
33import static com .snowflake .kafka .connector .SnowflakeSinkConnectorConfig .ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_CONFIG ;
44import static com .snowflake .kafka .connector .SnowflakeSinkConnectorConfig .ENABLE_CHANNEL_OFFSET_TOKEN_MIGRATION_DEFAULT ;
5- import static com .snowflake .kafka .connector .SnowflakeSinkConnectorConfig .ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG ;
6- import static com .snowflake .kafka .connector .SnowflakeSinkConnectorConfig .ERRORS_TOLERANCE_CONFIG ;
75import static java .util .stream .Collectors .toMap ;
86
97import com .google .common .annotations .VisibleForTesting ;
108import com .google .common .base .MoreObjects ;
119import com .google .common .base .Preconditions ;
12- import com .google .common .base .Strings ;
1310import com .snowflake .kafka .connector .SnowflakeSinkConnectorConfig ;
1411import com .snowflake .kafka .connector .Utils ;
1512import com .snowflake .kafka .connector .dlq .KafkaRecordErrorReporter ;
4441import net .snowflake .ingest .utils .SFException ;
4542import org .apache .kafka .common .TopicPartition ;
4643import org .apache .kafka .connect .errors .ConnectException ;
47- import org .apache .kafka .connect .errors .DataException ;
4844import org .apache .kafka .connect .sink .SinkRecord ;
4945import org .apache .kafka .connect .sink .SinkTaskContext ;
5046
@@ -94,9 +90,6 @@ public class DirectTopicPartitionChannel implements TopicPartitionChannel {
9490 /* Error handling, DB, schema, Snowflake URL and other snowflake specific connector properties are defined here. */
9591 private final Map <String , String > sfConnectorConfig ;
9692
97- /* Responsible for returning errors to DLQ if records have failed to be ingested. */
98- private final KafkaRecordErrorReporter kafkaRecordErrorReporter ;
99-
10093 private final SchemaEvolutionService schemaEvolutionService ;
10194
10295 /**
@@ -105,20 +98,6 @@ public class DirectTopicPartitionChannel implements TopicPartitionChannel {
10598 */
10699 private final SinkTaskContext sinkTaskContext ;
107100
108- /* Error related properties */
109-
110- // If set to true, we will send records to DLQ provided DLQ name is valid.
111- private final boolean errorTolerance ;
112-
113- // Whether to log errors to log file.
114- private final boolean logErrors ;
115-
116- // Set to false if DLQ topic is null or empty. True if it is a valid string in config
117- private final boolean isDLQTopicSet ;
118-
119- // Whether schematization has been enabled.
120- private final boolean enableSchematization ;
121-
122101 // Whether schema evolution could be done on this channel
123102 private final boolean enableSchemaEvolution ;
124103
@@ -141,6 +120,8 @@ public class DirectTopicPartitionChannel implements TopicPartitionChannel {
141120
142121 private final FailsafeExecutor <Long > offsetTokenExecutor ;
143122
123+ private final StreamingErrorHandler streamingErrorHandler ;
124+
144125 /** Testing only, initialize TopicPartitionChannel without the connection service */
145126 @ VisibleForTesting
146127 public DirectTopicPartitionChannel (
@@ -162,7 +143,6 @@ public DirectTopicPartitionChannel(
162143 tableName ,
163144 false , /* No schema evolution permission */
164145 sfConnectorConfig ,
165- kafkaRecordErrorReporter ,
166146 sinkTaskContext ,
167147 conn ,
168148 new StreamingRecordService (
@@ -173,7 +153,8 @@ public DirectTopicPartitionChannel(
173153 false ,
174154 null ,
175155 schemaEvolutionService ,
176- insertErrorMapper );
156+ insertErrorMapper ,
157+ new StreamingErrorHandler (sfConnectorConfig , kafkaRecordErrorReporter , telemetryService ));
177158 }
178159
179160 /**
@@ -192,23 +173,24 @@ public DirectTopicPartitionChannel(
192173 * @param telemetryService Telemetry Service which includes the Telemetry Client, sends Json data
193174 * to Snowflake
194175 * @param insertErrorMapper Mapper to map insert errors to schema evolution items
176+ * @param streamingErrorHandler contains DLQ and error logging related logic
195177 */
196178 public DirectTopicPartitionChannel (
197179 SnowflakeStreamingIngestClient streamingIngestClient ,
198180 TopicPartition topicPartition ,
199181 final String channelNameFormatV1 ,
200182 final String tableName ,
201- boolean hasSchemaEvolutionPermission ,
183+ final boolean enableSchemaEvolution ,
202184 final Map <String , String > sfConnectorConfig ,
203- KafkaRecordErrorReporter kafkaRecordErrorReporter ,
204185 SinkTaskContext sinkTaskContext ,
205186 SnowflakeConnectionService conn ,
206187 StreamingRecordService streamingRecordService ,
207188 SnowflakeTelemetryService telemetryService ,
208189 boolean enableCustomJMXMonitoring ,
209190 MetricsJmxReporter metricsJmxReporter ,
210191 SchemaEvolutionService schemaEvolutionService ,
211- InsertErrorMapper insertErrorMapper ) {
192+ InsertErrorMapper insertErrorMapper ,
193+ StreamingErrorHandler streamingErrorHandler ) {
212194 final long startTime = System .currentTimeMillis ();
213195
214196 this .streamingIngestClient = Preconditions .checkNotNull (streamingIngestClient );
@@ -217,23 +199,13 @@ public DirectTopicPartitionChannel(
217199 this .channelNameFormatV1 = Preconditions .checkNotNull (channelNameFormatV1 );
218200 this .tableName = Preconditions .checkNotNull (tableName );
219201 this .sfConnectorConfig = Preconditions .checkNotNull (sfConnectorConfig );
220- this .kafkaRecordErrorReporter = Preconditions .checkNotNull (kafkaRecordErrorReporter );
221202 this .sinkTaskContext = Preconditions .checkNotNull (sinkTaskContext );
222203 this .conn = conn ;
223204
224205 this .streamingRecordService = streamingRecordService ;
225206 this .telemetryServiceV2 = Preconditions .checkNotNull (telemetryService );
226207
227- /* Error properties */
228- this .errorTolerance = StreamingUtils .tolerateErrors (this .sfConnectorConfig );
229- this .logErrors = StreamingUtils .logErrors (this .sfConnectorConfig );
230- this .isDLQTopicSet =
231- !Strings .isNullOrEmpty (StreamingUtils .getDlqTopicName (this .sfConnectorConfig ));
232-
233- /* Schematization related properties */
234- this .enableSchematization = Utils .isSchematizationEnabled (this .sfConnectorConfig );
235-
236- this .enableSchemaEvolution = this .enableSchematization && hasSchemaEvolutionPermission ;
208+ this .enableSchemaEvolution = enableSchemaEvolution ;
237209 this .schemaEvolutionService = schemaEvolutionService ;
238210
239211 this .channelOffsetTokenMigrator = new ChannelOffsetTokenMigrator (conn , telemetryService );
@@ -291,6 +263,7 @@ public DirectTopicPartitionChannel(
291263 + " correct offset instead" ,
292264 this .getChannelNameFormatV1 ());
293265 }
266+ this .streamingErrorHandler = streamingErrorHandler ;
294267 }
295268
296269 /**
@@ -479,7 +452,7 @@ private void handleInsertRowFailure(
479452 this .getChannelNameFormatV1 (),
480453 e );
481454 if (Objects .equals (e .getCode (), SnowflakeErrors .ERROR_5026 .getCode ())) {
482- handleError (Collections .singletonList (e ), kafkaSinkRecord );
455+ streamingErrorHandler . handleError (Collections .singletonList (e ), kafkaSinkRecord );
483456 } else {
484457 throw e ;
485458 }
@@ -489,7 +462,7 @@ private void handleInsertRowFailure(
489462 }
490463 }
491464
492- handleError (
465+ streamingErrorHandler . handleError (
493466 insertErrors .stream ()
494467 .map (InsertValidationResponse .InsertError ::getException )
495468 .collect (Collectors .toList ()),
@@ -501,42 +474,6 @@ private Map<String, ColumnProperties> getTableSchemaFromChannel() {
501474 .collect (toMap (Map .Entry ::getKey , entry -> new ColumnProperties (entry .getValue ())));
502475 }
503476
504- private void handleError (List <Exception > insertErrors , SinkRecord kafkaSinkRecord ) {
505- if (logErrors ) {
506- for (Exception insertError : insertErrors ) {
507- LOGGER .error ("Insert Row Error message:{}" , insertError .getMessage ());
508- }
509- }
510- if (errorTolerance ) {
511- if (!isDLQTopicSet ) {
512- LOGGER .warn (
513- "{} is set, however {} is not. The message will not be added to the Dead Letter Queue"
514- + " topic." ,
515- ERRORS_TOLERANCE_CONFIG ,
516- ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG );
517- } else {
518- LOGGER .warn (
519- "Adding the message to Dead Letter Queue topic: {}" ,
520- ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG );
521- this .kafkaRecordErrorReporter .reportError (
522- kafkaSinkRecord ,
523- insertErrors .stream ()
524- .findFirst ()
525- .orElseThrow (
526- () ->
527- new IllegalStateException (
528- "Reported record error, however exception list is empty." )));
529- }
530- } else {
531- final String errMsg =
532- String .format (
533- "Error inserting Records using Streaming API with msg:%s" ,
534- insertErrors .get (0 ).getMessage ());
535- this .telemetryServiceV2 .reportKafkaConnectFatalError (errMsg );
536- throw new DataException (errMsg , insertErrors .get (0 ));
537- }
538- }
539-
540477 @ Override
541478 @ VisibleForTesting
542479 public long fetchOffsetTokenWithRetry () {
0 commit comments