-
Notifications
You must be signed in to change notification settings - Fork 106
Open
Description
We use confluentinc/cp-kafka-connect:7.8.0
+ snowflakeinc/snowflake-kafka-connector:3.2.0
I thought we could use google/protobuf/timestamp.proto
for timestamp fields, and it almost works. I registered this proto file into schemaregistry:
syntax = "proto3";
option go_package = "iceberg-timestamp-issue/pkg/pbevents/searches;searches";
import "google/protobuf/timestamp.proto";
message Event {
string search_id = 3;
google.protobuf.Timestamp timestamp_received = 2;
}
protoc versions:
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.35.2
// protoc v5.29.3
// source: pkg/pbevents/searches/event.proto
Kafka connector indeed changed the table schema according to the new types:
create or replace ICEBERG TABLE ANALYTICS.TESTING.TEST_TIMESTAMP_ISSUE (
RECORD_METADATA OBJECT(offset LONG, topic STRING, partition INT, key STRING, schema_id INT, key_schema_id INT, CreateTime LONG, LogAppendTime LONG, SnowflakeConnectorPushTime LONG, headers MAP(STRING, STRING)),
SEARCH_ID STRING COMMENT 'column created by schema evolution from Snowflake Kafka Connector',
TIMESTAMP_RECEIVED TIMESTAMP_NTZ(6) COMMENT 'column created by schema evolution from Snowflake Kafka Connector'
)
EXTERNAL_VOLUME = 'EVENTS_EXTERNAL_VOLUME'
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'iceberg_test/timestamp_issue/iceberg/events/';
But for some reasons, there is a problem in data conversions, when Golang produces events with these timestamps:
event := &searches.Event{
SearchId: strconv.Itoa(i),
TimestampReceived: timestamppb.New(time.Now()),
}
err := sendEvent(event)
kafka connector throws this exception:
kafka_iceberg_ingesting | [2025-07-01 07:50:06,437] ERROR [test-timestamp-issue|task-0] WorkerSinkTask{id=test-timestamp-issue-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Error inserting Records using Streaming API with msg:The given row cannot be converted to the internal format: Object of type java.lang.Long cannot be ingested into Snowflake column TIMESTAMP_RECEIVED of type TIMESTAMP, rowIndex:0. Allowed Java types: String, LocalDate, LocalDateTime, ZonedDateTime, OffsetDateTime (org.apache.kafka.connect.runtime.WorkerSinkTask:633)
kafka_iceberg_ingesting | org.apache.kafka.connect.errors.DataException: Error inserting Records using Streaming API with msg:The given row cannot be converted to the internal format: Object of type java.lang.Long cannot be ingested into Snowflake column TIMESTAMP_RECEIVED of type TIMESTAMP, rowIndex:0. Allowed Java types: String, LocalDate, LocalDateTime, ZonedDateTime, OffsetDateTime
The exception is much longer, but this is the main reason
Expected behaviour: google.protobuf.Timestamp
should be inserted as TIMESTAMP_NTZ
without the issue
Metadata
Metadata
Assignees
Labels
No labels