diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java index 9650ce16270c..2d3ee4a4f917 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java @@ -107,6 +107,8 @@ public class IcebergSinkConfig extends AbstractConfig { @VisibleForTesting static final String COMMA_NO_PARENS_REGEX = ",(?![^()]*+\\))"; + public static final int DEFAULT_VALUE_MIN_FORMAT_VERSION = 3; + public static final ConfigDef CONFIG_DEF = newConfigDef(); public static String version() { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java index 92f5af2d7a87..406fcfbb1c8e 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -72,6 +73,11 @@ RecordWriter createWriter(String tableName, SinkRecord sample, boolean ignoreMis @VisibleForTesting Table autoCreateTable(String tableName, SinkRecord sample) { + // Determine the format version that will be used for the table + int formatVersion = + Integer.parseInt( + config.autoCreateProps().getOrDefault(TableProperties.FORMAT_VERSION, "-1")); + StructType structType; if (sample.valueSchema() == null) { Type type = SchemaUtils.inferIcebergType(sample.value(), config); @@ -80,7 +86,8 @@ Table autoCreateTable(String tableName, SinkRecord sample) { } structType = type.asStructType(); } else { - structType = SchemaUtils.toIcebergType(sample.valueSchema(), config).asStructType(); + structType = + SchemaUtils.toIcebergType(sample.valueSchema(), config, formatVersion).asStructType(); } org.apache.iceberg.Schema schema = new org.apache.iceberg.Schema(structType.fields()); diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java index 1a57a6444870..f2a5daaec3f3 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java @@ -45,6 +45,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TableUtil; import org.apache.iceberg.connect.IcebergSinkConfig; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; @@ -66,8 +67,12 @@ import org.apache.iceberg.util.UUIDUtil; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.json.JsonConverter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class RecordConverter { + private static final Logger LOG = LoggerFactory.getLogger(RecordConverter.class); private static final ObjectMapper MAPPER = new ObjectMapper(); @@ -81,11 +86,21 @@ class RecordConverter { private final NameMapping nameMapping; private final IcebergSinkConfig config; private final Map> structNameMap = Maps.newHashMap(); + private final int tableFormatVersion; RecordConverter(Table table, IcebergSinkConfig config) { this.tableSchema = table.schema(); this.nameMapping = createNameMapping(table); this.config = config; + + int formatVersion; + try { + formatVersion = TableUtil.formatVersion(table); + } catch (Exception ex) { + LOG.error("Failed to retrieve format version from table {}", table.name(), ex); + formatVersion = -1; + } + this.tableFormatVersion = formatVersion; } Record convert(Object data) { @@ -217,8 +232,23 @@ private GenericRecord convertToStruct( if (schemaUpdateConsumer != null) { String parentFieldName = structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId); - Type type = SchemaUtils.toIcebergType(recordField.schema(), config); - schemaUpdateConsumer.addColumn(parentFieldName, recordField.name(), type); + Type type = + SchemaUtils.toIcebergType(recordField.schema(), config, tableFormatVersion); + org.apache.iceberg.expressions.Literal defaultLiteral = null; + if (tableFormatVersion >= IcebergSinkConfig.DEFAULT_VALUE_MIN_FORMAT_VERSION) { + // Extract default value from Kafka Connect schema if present + Object defaultValue = recordField.schema().defaultValue(); + if (defaultValue != null) { + defaultLiteral = SchemaUtils.convertDefaultValue(defaultValue, type); + } + } else { + LOG.info( + "Format version ({}) < min format version ({}) required for default value support", + tableFormatVersion, + IcebergSinkConfig.DEFAULT_VALUE_MIN_FORMAT_VERSION); + } + schemaUpdateConsumer.addColumn( + parentFieldName, recordField.name(), type, defaultLiteral); } } else { boolean hasSchemaUpdates = false; @@ -385,7 +415,13 @@ protected String convertString(Object value) { return MAPPER.writeValueAsString(value); } else if (value instanceof Struct) { Struct struct = (Struct) value; - byte[] data = config.jsonConverter().fromConnectData(null, struct.schema(), struct); + byte[] data; + try (JsonConverter jsonConverter = config.jsonConverter()) { + data = jsonConverter.fromConnectData(null, struct.schema(), struct); + } catch (Exception ex) { + LOG.error("Error while converting string of type struct", ex); + throw ex; + } return new String(data, StandardCharsets.UTF_8); } } catch (IOException e) { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUpdate.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUpdate.java index 809bea84dcc2..da4705bd6063 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUpdate.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUpdate.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.Map; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Type.PrimitiveType; @@ -48,7 +49,11 @@ boolean empty() { } void addColumn(String parentName, String name, Type type) { - AddColumn addCol = new AddColumn(parentName, name, type); + addColumn(parentName, name, type, null); + } + + void addColumn(String parentName, String name, Type type, Literal defaultValue) { + AddColumn addCol = new AddColumn(parentName, name, type, defaultValue); addColumns.put(addCol.key(), addCol); } @@ -65,11 +70,13 @@ static class AddColumn extends SchemaUpdate { private final String parentName; private final String name; private final Type type; + private final Literal defaultValue; - AddColumn(String parentName, String name, Type type) { + AddColumn(String parentName, String name, Type type, Literal defaultValue) { this.parentName = parentName; this.name = name; this.type = type; + this.defaultValue = defaultValue; } String parentName() { @@ -87,6 +94,10 @@ String key() { Type type() { return type; } + + Literal defaultValue() { + return defaultValue; + } } static class UpdateType extends SchemaUpdate { diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java index b0dd56b45d67..c7cc019c5456 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java @@ -19,6 +19,7 @@ package org.apache.iceberg.connect.data; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; @@ -36,6 +37,8 @@ import org.apache.iceberg.connect.data.SchemaUpdate.AddColumn; import org.apache.iceberg.connect.data.SchemaUpdate.MakeOptional; import org.apache.iceberg.connect.data.SchemaUpdate.UpdateType; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Type.PrimitiveType; @@ -123,7 +126,9 @@ private static void commitSchemaUpdates(Table table, SchemaUpdate.Consumer updat // apply the updates UpdateSchema updateSchema = table.updateSchema(); addColumns.forEach( - update -> updateSchema.addColumn(update.parentName(), update.name(), update.type())); + update -> + updateSchema.addColumn( + update.parentName(), update.name(), update.type(), null, update.defaultValue())); updateTypes.forEach(update -> updateSchema.updateColumn(update.name(), update.type())); makeOptionals.forEach(update -> updateSchema.makeColumnOptional(update.name())); updateSchema.commit(); @@ -209,21 +214,201 @@ private static Pair transformArgPair(String argsStr) { return Pair.of(parts.get(0).trim(), Integer.parseInt(parts.get(1).trim())); } - static Type toIcebergType(Schema valueSchema, IcebergSinkConfig config) { - return new SchemaGenerator(config).toIcebergType(valueSchema); + static Type toIcebergType(Schema valueSchema, IcebergSinkConfig config, int formatVersion) { + return new SchemaGenerator(config, formatVersion).toIcebergType(valueSchema); } static Type inferIcebergType(Object value, IcebergSinkConfig config) { - return new SchemaGenerator(config).inferIcebergType(value); + return new SchemaGenerator(config, IcebergSinkConfig.DEFAULT_VALUE_MIN_FORMAT_VERSION) + .inferIcebergType(value); + } + + /** + * Converts a Kafka Connect default value to an Iceberg Literal based on the provided Iceberg + * type. + * + * @param defaultValue the default value from Kafka Connect schema + * @param icebergType the target Iceberg type + * @return an Iceberg Literal representing the default value, or null if conversion is not + * possible + */ + static Literal convertDefaultValue(Object defaultValue, Type icebergType) { + if (defaultValue == null) { + return null; + } + + try { + TypeID typeId = icebergType.typeId(); + switch (typeId) { + case BOOLEAN: + return Expressions.lit((Boolean) defaultValue); + case INTEGER: + return convertIntegerDefault(defaultValue); + case LONG: + return convertLongDefault(defaultValue); + case FLOAT: + return convertFloatDefault(defaultValue); + case DOUBLE: + return convertDoubleDefault(defaultValue); + case STRING: + return Expressions.lit(defaultValue.toString()); + case DECIMAL: + return convertDecimalDefault(defaultValue, (DecimalType) icebergType); + case DATE: + return convertDateDefault(defaultValue, icebergType); + case TIME: + return convertTimeDefault(defaultValue, icebergType); + case TIMESTAMP: + return convertTimestampDefault(defaultValue); + case BINARY: + case FIXED: + return convertBinaryDefault(defaultValue); + case UUID: + return convertUuidDefault(defaultValue); + default: + // Nested types (LIST, MAP, STRUCT) cannot have non-null defaults in Iceberg + LOG.warn( + "Default value conversion not supported for type: {}. Nested types can only have null defaults.", + typeId); + return null; + } + } catch (Exception e) { + LOG.warn( + "Failed to convert default value {} to Iceberg type {}", defaultValue, icebergType, e); + return null; + } + } + + private static Literal convertIntegerDefault(Object defaultValue) { + if (defaultValue instanceof Number) { + return Expressions.lit(((Number) defaultValue).intValue()); + } + return null; + } + + private static Literal convertLongDefault(Object defaultValue) { + if (defaultValue instanceof Number) { + return Expressions.lit(((Number) defaultValue).longValue()); + } + return null; + } + + private static Literal convertFloatDefault(Object defaultValue) { + if (defaultValue instanceof Number) { + return Expressions.lit(((Number) defaultValue).floatValue()); + } + return null; + } + + private static Literal convertDoubleDefault(Object defaultValue) { + if (defaultValue instanceof Number) { + return Expressions.lit(((Number) defaultValue).doubleValue()); + } + return null; + } + + private static Literal convertDecimalDefault(Object defaultValue, DecimalType decimalType) { + if (defaultValue instanceof BigDecimal) { + return Expressions.lit((BigDecimal) defaultValue); + } else if (defaultValue instanceof byte[]) { + // Kafka Connect Decimal can be byte array + // BigDecimal constructor takes (unscaledValue, scale) + // Precision is determined by the number of digits in unscaledValue + BigDecimal decimal = + new BigDecimal(new java.math.BigInteger((byte[]) defaultValue), decimalType.scale()); + return Expressions.lit(decimal); + } + return null; + } + + private static Literal convertDateDefault(Object defaultValue, Type icebergType) { + // Iceberg Date is stored as days from epoch (int) + if (defaultValue instanceof java.util.Date) { + // Convert java.util.Date to days from epoch + long epochDay = + ((java.util.Date) defaultValue) + .toInstant() + .atZone(java.time.ZoneOffset.UTC) + .toLocalDate() + .toEpochDay(); + // Create an IntegerLiteral and convert to DateLiteral using .to(Type) + return Expressions.lit((int) epochDay).to(icebergType); + } else if (defaultValue instanceof Number) { + // Already in days from epoch + return Expressions.lit(((Number) defaultValue).intValue()).to(icebergType); + } else if (defaultValue instanceof LocalDate) { + int days = (int) ((LocalDate) defaultValue).toEpochDay(); + return Expressions.lit(days).to(icebergType); + } + return null; + } + + private static Literal convertTimeDefault(Object defaultValue, Type icebergType) { + // Iceberg Time is stored as microseconds from midnight (long) + if (defaultValue instanceof java.util.Date) { + // Kafka Connect Time is milliseconds since midnight + long millis = ((java.util.Date) defaultValue).getTime(); + // Create a LongLiteral and convert to TimeLiteral using .to(Type) + return Expressions.lit(millis * 1000).to(icebergType); + } else if (defaultValue instanceof Number) { + // Assume microseconds from midnight + return Expressions.lit(((Number) defaultValue).longValue()).to(icebergType); + } else if (defaultValue instanceof LocalTime) { + long micros = ((LocalTime) defaultValue).toNanoOfDay() / 1000; + return Expressions.lit(micros).to(icebergType); + } + return null; + } + + private static Literal convertTimestampDefault(Object defaultValue) { + // Iceberg Timestamp is stored as microseconds from epoch (long) + if (defaultValue instanceof java.util.Date) { + // Kafka Connect Timestamp is milliseconds from epoch + long micros = ((java.util.Date) defaultValue).getTime() * 1000; + // Create TimestampLiteral directly using micros() which returns the correct type + return Expressions.micros(micros); + } else if (defaultValue instanceof Number) { + // Assume microseconds from epoch + return Expressions.micros(((Number) defaultValue).longValue()); + } else if (defaultValue instanceof LocalDateTime) { + long micros = + ((LocalDateTime) defaultValue).atZone(java.time.ZoneOffset.UTC).toInstant().toEpochMilli() + * 1000; + return Expressions.micros(micros); + } else if (defaultValue instanceof OffsetDateTime) { + long micros = ((OffsetDateTime) defaultValue).toInstant().toEpochMilli() * 1000; + return Expressions.micros(micros); + } + return null; + } + + private static Literal convertBinaryDefault(Object defaultValue) { + if (defaultValue instanceof byte[]) { + return Expressions.lit(ByteBuffer.wrap((byte[]) defaultValue)); + } else if (defaultValue instanceof ByteBuffer) { + return Expressions.lit((ByteBuffer) defaultValue); + } + return null; + } + + private static Literal convertUuidDefault(Object defaultValue) { + if (defaultValue instanceof java.util.UUID) { + return Expressions.lit((java.util.UUID) defaultValue); + } else if (defaultValue instanceof String) { + return Expressions.lit(java.util.UUID.fromString((String) defaultValue)); + } + return null; } static class SchemaGenerator { private int fieldId = 1; private final IcebergSinkConfig config; + private final int formatVersion; - SchemaGenerator(IcebergSinkConfig config) { + SchemaGenerator(IcebergSinkConfig config, int formatVersion) { this.config = config; + this.formatVersion = formatVersion; } @SuppressWarnings("checkstyle:CyclomaticComplexity") @@ -275,14 +460,33 @@ Type toIcebergType(Schema valueSchema) { List structFields = valueSchema.fields().stream() .map( - field -> - NestedField.builder() - .isOptional( - config.schemaForceOptional() || field.schema().isOptional()) - .withId(nextId()) - .ofType(toIcebergType(field.schema())) - .withName(field.name()) - .build()) + field -> { + Type fieldType = toIcebergType(field.schema()); + NestedField.Builder builder = + NestedField.builder() + .isOptional( + config.schemaForceOptional() || field.schema().isOptional()) + .withId(nextId()) + .ofType(fieldType) + .withName(field.name()); + + // Apply default only if the table format version is greater or equal to the + // minimum format version needed to support default which is 3 + if (formatVersion >= IcebergSinkConfig.DEFAULT_VALUE_MIN_FORMAT_VERSION) { + // Extract default value from Kafka Connect schema if present + Object defaultValue = field.schema().defaultValue(); + if (defaultValue != null) { + Literal defaultLiteral = + convertDefaultValue(defaultValue, fieldType); + if (defaultLiteral != null) { + builder.withInitialDefault(defaultLiteral); + builder.withWriteDefault(defaultLiteral); + } + } + } + + return builder.build(); + }) .collect(Collectors.toList()); return StructType.of(structFields); case STRING: diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUpdate.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUpdate.java index 22b3c6d53537..84b58657936b 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUpdate.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUpdate.java @@ -20,6 +20,8 @@ import static org.assertj.core.api.Assertions.assertThat; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; @@ -37,6 +39,51 @@ public void testAddColumn() { assertThat(addColumn.parentName()).isEqualTo("parent"); assertThat(addColumn.name()).isEqualTo("name"); assertThat(addColumn.type()).isEqualTo(Types.StringType.get()); + assertThat(addColumn.defaultValue()).isNull(); + } + + @Test + public void testAddColumnWithDefaultValue() { + SchemaUpdate.Consumer updateConsumer = new SchemaUpdate.Consumer(); + Literal defaultValue = Expressions.lit("default_value"); + updateConsumer.addColumn("parent", "name", Types.StringType.get(), defaultValue); + assertThat(updateConsumer.addColumns()).hasSize(1); + assertThat(updateConsumer.updateTypes()).isEmpty(); + assertThat(updateConsumer.makeOptionals()).isEmpty(); + + SchemaUpdate.AddColumn addColumn = updateConsumer.addColumns().iterator().next(); + assertThat(addColumn.parentName()).isEqualTo("parent"); + assertThat(addColumn.name()).isEqualTo("name"); + assertThat(addColumn.type()).isEqualTo(Types.StringType.get()); + assertThat(addColumn.defaultValue()).isEqualTo(defaultValue); + assertThat(addColumn.defaultValue().value()).isEqualTo("default_value"); + } + + @Test + public void testAddColumnWithNumericDefaultValue() { + SchemaUpdate.Consumer updateConsumer = new SchemaUpdate.Consumer(); + Literal defaultValue = Expressions.lit(42); + updateConsumer.addColumn(null, "age", Types.IntegerType.get(), defaultValue); + + SchemaUpdate.AddColumn addColumn = updateConsumer.addColumns().iterator().next(); + assertThat(addColumn.parentName()).isNull(); + assertThat(addColumn.name()).isEqualTo("age"); + assertThat(addColumn.type()).isEqualTo(Types.IntegerType.get()); + assertThat(addColumn.defaultValue()).isEqualTo(defaultValue); + assertThat(addColumn.defaultValue().value()).isEqualTo(42); + } + + @Test + public void testAddColumnWithBooleanDefaultValue() { + SchemaUpdate.Consumer updateConsumer = new SchemaUpdate.Consumer(); + Literal defaultValue = Expressions.lit(true); + updateConsumer.addColumn(null, "active", Types.BooleanType.get(), defaultValue); + + SchemaUpdate.AddColumn addColumn = updateConsumer.addColumns().iterator().next(); + assertThat(addColumn.name()).isEqualTo("active"); + assertThat(addColumn.type()).isEqualTo(Types.BooleanType.get()); + assertThat(addColumn.defaultValue()).isEqualTo(defaultValue); + assertThat(addColumn.defaultValue().value()).isEqualTo(true); } @Test diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUtils.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUtils.java index bde2452128b9..b54ddb937b1d 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUtils.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestSchemaUtils.java @@ -113,13 +113,13 @@ public void testApplySchemaUpdates() { verify(table).refresh(); verify(table).updateSchema(); - verify(updateSchema).addColumn(isNull(), eq("s"), isA(StringType.class)); + verify(updateSchema).addColumn(isNull(), eq("s"), isA(StringType.class), isNull(), isNull()); verify(updateSchema).updateColumn(eq("f"), isA(DoubleType.class)); verify(updateSchema).makeColumnOptional(eq("i")); verify(updateSchema).commit(); // check that there are no unexpected invocations... - verify(updateSchema).addColumn(isNull(), anyString(), any()); + verify(updateSchema).addColumn(isNull(), anyString(), any(), isNull(), any()); verify(updateSchema).updateColumn(any(), any()); verify(updateSchema).makeColumnOptional(any()); } @@ -143,13 +143,13 @@ public void testApplyNestedSchemaUpdates() { verify(table).refresh(); verify(table).updateSchema(); - verify(updateSchema).addColumn(eq("st"), eq("s"), isA(StringType.class)); + verify(updateSchema).addColumn(eq("st"), eq("s"), isA(StringType.class), isNull(), isNull()); verify(updateSchema).updateColumn(eq("st.f"), isA(DoubleType.class)); verify(updateSchema).makeColumnOptional(eq("st.i")); verify(updateSchema).commit(); // check that there are no unexpected invocations... - verify(updateSchema).addColumn(anyString(), anyString(), any()); + verify(updateSchema).addColumn(anyString(), anyString(), any(), isNull(), any()); verify(updateSchema).updateColumn(any(), any()); verify(updateSchema).makeColumnOptional(any()); } @@ -220,43 +220,51 @@ public void testToIcebergType(boolean forceOptional) { IcebergSinkConfig config = mock(IcebergSinkConfig.class); when(config.schemaForceOptional()).thenReturn(forceOptional); - assertThat(SchemaUtils.toIcebergType(Schema.BOOLEAN_SCHEMA, config)) + int formatVersion = 2; // Use format version 2 for basic type conversion tests + + assertThat(SchemaUtils.toIcebergType(Schema.BOOLEAN_SCHEMA, config, formatVersion)) .isInstanceOf(BooleanType.class); - assertThat(SchemaUtils.toIcebergType(Schema.BYTES_SCHEMA, config)) + assertThat(SchemaUtils.toIcebergType(Schema.BYTES_SCHEMA, config, formatVersion)) .isInstanceOf(BinaryType.class); - assertThat(SchemaUtils.toIcebergType(Schema.INT8_SCHEMA, config)) + assertThat(SchemaUtils.toIcebergType(Schema.INT8_SCHEMA, config, formatVersion)) .isInstanceOf(IntegerType.class); - assertThat(SchemaUtils.toIcebergType(Schema.INT16_SCHEMA, config)) + assertThat(SchemaUtils.toIcebergType(Schema.INT16_SCHEMA, config, formatVersion)) .isInstanceOf(IntegerType.class); - assertThat(SchemaUtils.toIcebergType(Schema.INT32_SCHEMA, config)) + assertThat(SchemaUtils.toIcebergType(Schema.INT32_SCHEMA, config, formatVersion)) .isInstanceOf(IntegerType.class); - assertThat(SchemaUtils.toIcebergType(Schema.INT64_SCHEMA, config)).isInstanceOf(LongType.class); - assertThat(SchemaUtils.toIcebergType(Schema.FLOAT32_SCHEMA, config)) + assertThat(SchemaUtils.toIcebergType(Schema.INT64_SCHEMA, config, formatVersion)) + .isInstanceOf(LongType.class); + assertThat(SchemaUtils.toIcebergType(Schema.FLOAT32_SCHEMA, config, formatVersion)) .isInstanceOf(FloatType.class); - assertThat(SchemaUtils.toIcebergType(Schema.FLOAT64_SCHEMA, config)) + assertThat(SchemaUtils.toIcebergType(Schema.FLOAT64_SCHEMA, config, formatVersion)) .isInstanceOf(DoubleType.class); - assertThat(SchemaUtils.toIcebergType(Schema.STRING_SCHEMA, config)) + assertThat(SchemaUtils.toIcebergType(Schema.STRING_SCHEMA, config, formatVersion)) .isInstanceOf(StringType.class); - assertThat(SchemaUtils.toIcebergType(Date.SCHEMA, config)).isInstanceOf(DateType.class); - assertThat(SchemaUtils.toIcebergType(Time.SCHEMA, config)).isInstanceOf(TimeType.class); + assertThat(SchemaUtils.toIcebergType(Date.SCHEMA, config, formatVersion)) + .isInstanceOf(DateType.class); + assertThat(SchemaUtils.toIcebergType(Time.SCHEMA, config, formatVersion)) + .isInstanceOf(TimeType.class); - Type timestampType = SchemaUtils.toIcebergType(Timestamp.SCHEMA, config); + Type timestampType = SchemaUtils.toIcebergType(Timestamp.SCHEMA, config, formatVersion); assertThat(timestampType).isInstanceOf(TimestampType.class); assertThat(((TimestampType) timestampType).shouldAdjustToUTC()).isTrue(); - Type decimalType = SchemaUtils.toIcebergType(Decimal.schema(4), config); + Type decimalType = SchemaUtils.toIcebergType(Decimal.schema(4), config, formatVersion); assertThat(decimalType).isInstanceOf(DecimalType.class); assertThat(((DecimalType) decimalType).scale()).isEqualTo(4); Type listType = - SchemaUtils.toIcebergType(SchemaBuilder.array(Schema.STRING_SCHEMA).build(), config); + SchemaUtils.toIcebergType( + SchemaBuilder.array(Schema.STRING_SCHEMA).build(), config, formatVersion); assertThat(listType).isInstanceOf(ListType.class); assertThat(listType.asListType().elementType()).isInstanceOf(StringType.class); assertThat(listType.asListType().isElementOptional()).isEqualTo(forceOptional); Type mapType = SchemaUtils.toIcebergType( - SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).build(), config); + SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).build(), + config, + formatVersion); assertThat(mapType).isInstanceOf(MapType.class); assertThat(mapType.asMapType().keyType()).isInstanceOf(StringType.class); assertThat(mapType.asMapType().valueType()).isInstanceOf(StringType.class); @@ -264,7 +272,7 @@ public void testToIcebergType(boolean forceOptional) { Type structType = SchemaUtils.toIcebergType( - SchemaBuilder.struct().field("i", Schema.INT32_SCHEMA).build(), config); + SchemaBuilder.struct().field("i", Schema.INT32_SCHEMA).build(), config, formatVersion); assertThat(structType).isInstanceOf(StructType.class); assertThat(structType.asStructType().fieldType("i")).isInstanceOf(IntegerType.class); assertThat(structType.asStructType().field("i").isOptional()).isEqualTo(forceOptional); @@ -331,4 +339,245 @@ public void testInferIcebergTypeEmpty() { assertThat(SchemaUtils.inferIcebergType(ImmutableMap.of("nested", ImmutableMap.of()), config)) .isNull(); } + + @Test + public void testConvertDefaultValuePrimitiveTypes() { + // Test boolean + org.apache.iceberg.expressions.Literal boolLit = + SchemaUtils.convertDefaultValue(true, BooleanType.get()); + assertThat(boolLit).isNotNull(); + assertThat(boolLit.value()).isEqualTo(true); + + // Test integer + org.apache.iceberg.expressions.Literal intLit = + SchemaUtils.convertDefaultValue(42, IntegerType.get()); + assertThat(intLit).isNotNull(); + assertThat(intLit.value()).isEqualTo(42); + + // Test long + org.apache.iceberg.expressions.Literal longLit = + SchemaUtils.convertDefaultValue(1234567890L, LongType.get()); + assertThat(longLit).isNotNull(); + assertThat(longLit.value()).isEqualTo(1234567890L); + + // Test float + org.apache.iceberg.expressions.Literal floatLit = + SchemaUtils.convertDefaultValue(3.14f, FloatType.get()); + assertThat(floatLit).isNotNull(); + assertThat(floatLit.value()).isEqualTo(3.14f); + + // Test double + org.apache.iceberg.expressions.Literal doubleLit = + SchemaUtils.convertDefaultValue(2.71828, DoubleType.get()); + assertThat(doubleLit).isNotNull(); + assertThat(doubleLit.value()).isEqualTo(2.71828); + + // Test string + org.apache.iceberg.expressions.Literal stringLit = + SchemaUtils.convertDefaultValue("default_value", StringType.get()); + assertThat(stringLit).isNotNull(); + assertThat(stringLit.value()).isEqualTo("default_value"); + } + + @Test + public void testConvertDefaultValueDecimal() { + Schema decimalSchema = + Decimal.builder(2).parameter(Decimal.SCALE_FIELD, "2").optional().build(); + BigDecimal value = new BigDecimal("12.34"); + + org.apache.iceberg.expressions.Literal decimalLit = + SchemaUtils.convertDefaultValue(value, DecimalType.of(10, 2)); + assertThat(decimalLit).isNotNull(); + assertThat(decimalLit.value()).isEqualTo(value); + } + + @Test + public void testConvertDefaultValueNull() { + org.apache.iceberg.expressions.Literal nullLit = + SchemaUtils.convertDefaultValue(null, StringType.get()); + assertThat(nullLit).isNull(); + } + + @Test + public void testConvertDefaultValueNestedTypes() { + // Nested types (LIST, MAP, STRUCT) should return null for non-null defaults + org.apache.iceberg.expressions.Literal listLit = + SchemaUtils.convertDefaultValue( + ImmutableList.of("value"), ListType.ofOptional(1, StringType.get())); + assertThat(listLit).isNull(); + } + + @Test + public void testConvertDefaultValueWithNumberConversion() { + // Test that Number can be converted to different types + org.apache.iceberg.expressions.Literal intFromLong = + SchemaUtils.convertDefaultValue(100L, IntegerType.get()); + assertThat(intFromLong).isNotNull(); + assertThat(intFromLong.value()).isEqualTo(100); + + org.apache.iceberg.expressions.Literal longFromInt = + SchemaUtils.convertDefaultValue(50, LongType.get()); + assertThat(longFromInt).isNotNull(); + assertThat(longFromInt.value()).isEqualTo(50L); + + org.apache.iceberg.expressions.Literal floatFromDouble = + SchemaUtils.convertDefaultValue(1.5, FloatType.get()); + assertThat(floatFromDouble).isNotNull(); + assertThat(floatFromDouble.value()).isEqualTo(1.5f); + } + + @Test + public void testConvertDefaultValueTemporalTypes() { + // Test DATE - should convert to days from epoch + java.util.Date date = new java.util.Date(0); // Epoch + org.apache.iceberg.expressions.Literal dateLit = + SchemaUtils.convertDefaultValue(date, DateType.get()); + assertThat(dateLit).isNotNull(); + assertThat(dateLit.value()).isEqualTo(0); // Day 0 (1970-01-01) + + // Test DATE from LocalDate + LocalDate localDate = LocalDate.of(2024, 1, 1); + org.apache.iceberg.expressions.Literal localDateLit = + SchemaUtils.convertDefaultValue(localDate, DateType.get()); + assertThat(localDateLit).isNotNull(); + assertThat(localDateLit.value()).isEqualTo((int) localDate.toEpochDay()); + + // Test TIME - should convert to microseconds from midnight + java.util.Date time = new java.util.Date(3600000); // 1 hour in millis + org.apache.iceberg.expressions.Literal timeLit = + SchemaUtils.convertDefaultValue(time, TimeType.get()); + assertThat(timeLit).isNotNull(); + assertThat(timeLit.value()).isEqualTo(3600000L * 1000); // Converted to micros + + // Test TIMESTAMP - should use micros() method + java.util.Date timestamp = new java.util.Date(1000); // 1 second + org.apache.iceberg.expressions.Literal timestampLit = + SchemaUtils.convertDefaultValue(timestamp, TimestampType.withZone()); + assertThat(timestampLit).isNotNull(); + // The value should be in microseconds (1000 millis = 1000000 micros) + assertThat(timestampLit.value()).isEqualTo(1000L * 1000); + } + + @Test + public void testSchemaGeneratorExtractsDefaultsV3() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.schemaForceOptional()).thenReturn(false); + + Schema kafkaSchema = + SchemaBuilder.struct() + .field("id", Schema.INT32_SCHEMA) + .field("name", SchemaBuilder.string().defaultValue("unknown").build()) + .field("age", SchemaBuilder.int32().defaultValue(0).build()) + .field("active", SchemaBuilder.bool().defaultValue(true).build()) + .build(); + + // Test with format version 3 - should extract defaults + Type icebergType = SchemaUtils.toIcebergType(kafkaSchema, config, 3); + assertThat(icebergType).isInstanceOf(StructType.class); + + StructType structType = (StructType) icebergType; + assertThat(structType.fields()).hasSize(4); + + // Fields by name (not index) + NestedField idField = structType.field("id"); + assertThat(idField.name()).isEqualTo("id"); + assertThat(idField.initialDefault()).isNull(); + assertThat(idField.writeDefault()).isNull(); + + // Field with string default + NestedField nameField = structType.field("name"); + assertThat(nameField.name()).isEqualTo("name"); + assertThat(nameField.initialDefault()).isEqualTo("unknown"); + assertThat(nameField.writeDefault()).isEqualTo("unknown"); + + // Field with integer default + NestedField ageField = structType.field("age"); + assertThat(ageField.name()).isEqualTo("age"); + assertThat(ageField.initialDefault()).isEqualTo(0); + assertThat(ageField.writeDefault()).isEqualTo(0); + + // Field with boolean default + NestedField activeField = structType.field("active"); + assertThat(activeField.name()).isEqualTo("active"); + assertThat(activeField.initialDefault()).isEqualTo(true); + assertThat(activeField.writeDefault()).isEqualTo(true); + } + + @Test + public void testSchemaGeneratorIgnoresDefaultsV2() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.schemaForceOptional()).thenReturn(false); + + Schema kafkaSchema = + SchemaBuilder.struct() + .field("id", Schema.INT32_SCHEMA) + .field("name", SchemaBuilder.string().defaultValue("unknown").build()) + .field("age", SchemaBuilder.int32().defaultValue(0).build()) + .field("active", SchemaBuilder.bool().defaultValue(true).build()) + .build(); + + // Test with format version 2 - should NOT extract defaults + Type icebergType = SchemaUtils.toIcebergType(kafkaSchema, config, 2); + assertThat(icebergType).isInstanceOf(StructType.class); + + StructType structType = (StructType) icebergType; + assertThat(structType.fields()).hasSize(4); + + // All fields should have null defaults when using format version 2 + for (NestedField field : structType.fields()) { + assertThat(field.initialDefault()) + .as("Field %s should not have initial default in format v2", field.name()) + .isNull(); + assertThat(field.writeDefault()) + .as("Field %s should not have write default in format v2", field.name()) + .isNull(); + } + } + + @Test + public void testSchemaGeneratorWithDifferentFormatVersions() { + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.schemaForceOptional()).thenReturn(false); + + Schema kafkaSchema = + SchemaBuilder.struct() + .field("name", SchemaBuilder.string().defaultValue("test").build()) + .build(); + + // Format version 1 - no defaults + Type v1Type = SchemaUtils.toIcebergType(kafkaSchema, config, 1); + assertThat(v1Type.asStructType().field("name").initialDefault()).isNull(); + assertThat(v1Type.asStructType().field("name").writeDefault()).isNull(); + + // Format version 2 - no defaults + Type v2Type = SchemaUtils.toIcebergType(kafkaSchema, config, 2); + assertThat(v2Type.asStructType().field("name").initialDefault()).isNull(); + assertThat(v2Type.asStructType().field("name").writeDefault()).isNull(); + + // Format version 3 - with defaults + Type v3Type = SchemaUtils.toIcebergType(kafkaSchema, config, 3); + assertThat(v3Type.asStructType().field("name").initialDefault()).isEqualTo("test"); + assertThat(v3Type.asStructType().field("name").writeDefault()).isEqualTo("test"); + } + + @Test + public void testFormatVersionFromTableUsedForSchemaEvolution() { + // This test verifies that the table's format version is used, not the config + IcebergSinkConfig config = mock(IcebergSinkConfig.class); + when(config.schemaForceOptional()).thenReturn(false); + + Schema kafkaSchemaWithDefaults = + SchemaBuilder.struct() + .field("newColumn", SchemaBuilder.string().defaultValue("default_value").build()) + .build(); + + // When table is v3, defaults should be included regardless of config + Type typeFromV3Table = SchemaUtils.toIcebergType(kafkaSchemaWithDefaults, config, 3); + assertThat(typeFromV3Table.asStructType().field("newColumn").initialDefault()) + .isEqualTo("default_value"); + + // When table is v2, defaults should NOT be included + Type typeFromV2Table = SchemaUtils.toIcebergType(kafkaSchemaWithDefaults, config, 2); + assertThat(typeFromV2Table.asStructType().field("newColumn").initialDefault()).isNull(); + } }