-
Notifications
You must be signed in to change notification settings - Fork 106
Description
Environment:
- Kafka 3.9.0 / Snowflake Kafka connector 3.1.2
- Also tried with, Kafka 3.3.1 / Snowflake Kafka connector 2.0.0
- Snowflake Cloud
Issue:
- We're trying to stream data from Java App > Kafka > Snowflake
- Java App > Kafka works great and we can see the data in the topic.
Following is the sample data streamed from the Java app to Kafka which we can see in the topic:
{"SalesDocumentItemCategory":"ASD","BillingRelevanceCode":"CODE1","ScheduleLineIsAllowed":"Y","PricingRelevance":"NA","TextDeterminationProcedure":"99","PartnerDeterminationProcedure":"N","PropagatePrftbltySgmt2BOM":"BOM1","CostDeterminationIsRequired":"X"}
{"SalesDocumentItemCategory":"DAS","BillingRelevanceCode":"CODE2","ScheduleLineIsAllowed":"Y","PricingRelevance":"NA","TextDeterminationProcedure":"99","PartnerDeterminationProcedure":"N","PropagatePrftbltySgmt2BOM":"BOM1","CostDeterminationIsRequired":"X"}
- For Kafka > Snowflake, we use Snowflake Kafka connector with Snowpipe Streaming
- We receive an error (pasted at the end) while running the Snowflake Kafka connector.
- We're using Snowpipe Streaming ingestion method since we need schematization
- Although error is thrown by the connector, we can see the table created in SF, but without any rows or columns.
- If we use the same data (sample data mentioned above) and manually produce it using kafka-console-producer, the data is written into SF in the desired schema.
Are we missing any config for the connector?
Following is the connector properties content:
name=kfka_sf_connector3
topics=TP_BILLINGDOCUMENT
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
snowflake.url.name=xxx.snowflakecomputing.com
snowflake.user.name=xxx
snowflake.private.key=
snowflake.database.name=DB_TRFT
snowflake.schema.name=SCH_SAL1
#key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
#key.converter=org.apache.kafka.connect.storage.StringConverter
#key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter.schemas.enable=false
#value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
#value.converter=org.apache.kafka.connect.storage.StringConverter
#value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter.schemas.enable=false
snowflake.ingestion.method=SNOWPIPE_STREAMING
snowflake.enable.schematization=TRUE
snowflake.role.name=kafka_connector_role
Error:
[2025-04-09 14:28:20,848] ERROR [dsp_kfka_sf_connector3|task-7] WorkerSinkTask{id=kfka_sf_connector3-7} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:234)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:261)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:538)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:518)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:344)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:226)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:347)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:91)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:538)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:208)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:245)
... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'AES': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: REDACTED (StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION
disabled); line: 1, column: 4]
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:75)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:345)
... 18 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ASD': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: REDACTED (StreamReadFeature.INCLUDE_SOURCE_IN_LOCATION
disabled); line: 1, column: 4]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2481)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:762)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3703)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2791)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:911)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:797)
at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4928)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3292)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:73)
... 19 more