Add Default Value Support for Kafka Connect #14584
Open
+595
−39
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
This PR adds support for default values in Iceberg Kafka Connect, enabling automatic extraction and application of default values from Kafka Connect schemas during both auto-table creation and
schema evolution. Default values are only applied when the target Iceberg table uses format version 3 or higher, which introduced native support for column defaults.
Background
Iceberg format v3 introduced support for initial and write default values on columns. When a new column with a default value is added to a table, the default value is used for:
Kafka Connect schemas also support default values through the defaultValue() method on field schemas. This PR bridges these two systems, automatically transferring default values from Kafka Connect
to Iceberg tables when schema evolution occurs.
Behavior
Auto-Table Creation
When Kafka Connect auto-creates a new Iceberg table:
Schema Evolution on Existing Tables
When adding new columns to an existing table:
Example
Given a Kafka Connect schema:
Schema schema = SchemaBuilder.struct()
.field("id", Schema.INT32_SCHEMA)
.field("name", SchemaBuilder.string().defaultValue("unknown").build())
.field("age", SchemaBuilder.int32().defaultValue(0).build())
.field("active", SchemaBuilder.bool().defaultValue(true).build())
.build();
For a format v3 table:
For a format v2 table:
Compatibility
✅ Backward Compatible:
✅ Forward Compatible:
✅ Safe Fallback: