Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public class IcebergSinkConfig extends AbstractConfig {

@VisibleForTesting static final String COMMA_NO_PARENS_REGEX = ",(?![^()]*+\\))";

public static final int DEFAULT_VALUE_MIN_FORMAT_VERSION = 3;

public static final ConfigDef CONFIG_DEF = newConfigDef();

public static String version() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
Expand Down Expand Up @@ -72,6 +73,11 @@ RecordWriter createWriter(String tableName, SinkRecord sample, boolean ignoreMis

@VisibleForTesting
Table autoCreateTable(String tableName, SinkRecord sample) {
// Determine the format version that will be used for the table
int formatVersion =
Integer.parseInt(
config.autoCreateProps().getOrDefault(TableProperties.FORMAT_VERSION, "-1"));

StructType structType;
if (sample.valueSchema() == null) {
Type type = SchemaUtils.inferIcebergType(sample.value(), config);
Expand All @@ -80,7 +86,8 @@ Table autoCreateTable(String tableName, SinkRecord sample) {
}
structType = type.asStructType();
} else {
structType = SchemaUtils.toIcebergType(sample.valueSchema(), config).asStructType();
structType =
SchemaUtils.toIcebergType(sample.valueSchema(), config, formatVersion).asStructType();
}

org.apache.iceberg.Schema schema = new org.apache.iceberg.Schema(structType.fields());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
Expand All @@ -66,8 +67,12 @@
import org.apache.iceberg.util.UUIDUtil;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RecordConverter {
private static final Logger LOG = LoggerFactory.getLogger(RecordConverter.class);

private static final ObjectMapper MAPPER = new ObjectMapper();

Expand All @@ -81,11 +86,21 @@ class RecordConverter {
private final NameMapping nameMapping;
private final IcebergSinkConfig config;
private final Map<Integer, Map<String, NestedField>> structNameMap = Maps.newHashMap();
private final int tableFormatVersion;

RecordConverter(Table table, IcebergSinkConfig config) {
this.tableSchema = table.schema();
this.nameMapping = createNameMapping(table);
this.config = config;

int formatVersion;
try {
formatVersion = TableUtil.formatVersion(table);
} catch (Exception ex) {
LOG.error("Failed to retrieve format version from table {}", table.name(), ex);
formatVersion = -1;
}
this.tableFormatVersion = formatVersion;
}

Record convert(Object data) {
Expand Down Expand Up @@ -217,8 +232,23 @@ private GenericRecord convertToStruct(
if (schemaUpdateConsumer != null) {
String parentFieldName =
structFieldId < 0 ? null : tableSchema.findColumnName(structFieldId);
Type type = SchemaUtils.toIcebergType(recordField.schema(), config);
schemaUpdateConsumer.addColumn(parentFieldName, recordField.name(), type);
Type type =
SchemaUtils.toIcebergType(recordField.schema(), config, tableFormatVersion);
org.apache.iceberg.expressions.Literal<?> defaultLiteral = null;
if (tableFormatVersion >= IcebergSinkConfig.DEFAULT_VALUE_MIN_FORMAT_VERSION) {
// Extract default value from Kafka Connect schema if present
Object defaultValue = recordField.schema().defaultValue();
if (defaultValue != null) {
defaultLiteral = SchemaUtils.convertDefaultValue(defaultValue, type);
}
} else {
LOG.info(
"Format version ({}) < min format version ({}) required for default value support",
tableFormatVersion,
IcebergSinkConfig.DEFAULT_VALUE_MIN_FORMAT_VERSION);
}
schemaUpdateConsumer.addColumn(
parentFieldName, recordField.name(), type, defaultLiteral);
}
} else {
boolean hasSchemaUpdates = false;
Expand Down Expand Up @@ -385,7 +415,13 @@ protected String convertString(Object value) {
return MAPPER.writeValueAsString(value);
} else if (value instanceof Struct) {
Struct struct = (Struct) value;
byte[] data = config.jsonConverter().fromConnectData(null, struct.schema(), struct);
byte[] data;
try (JsonConverter jsonConverter = config.jsonConverter()) {
data = jsonConverter.fromConnectData(null, struct.schema(), struct);
} catch (Exception ex) {
LOG.error("Error while converting string of type struct", ex);
throw ex;
}
return new String(data, StandardCharsets.UTF_8);
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Collection;
import java.util.Map;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Type.PrimitiveType;
Expand Down Expand Up @@ -48,7 +49,11 @@ boolean empty() {
}

void addColumn(String parentName, String name, Type type) {
AddColumn addCol = new AddColumn(parentName, name, type);
addColumn(parentName, name, type, null);
}

void addColumn(String parentName, String name, Type type, Literal<?> defaultValue) {
AddColumn addCol = new AddColumn(parentName, name, type, defaultValue);
addColumns.put(addCol.key(), addCol);
}

Expand All @@ -65,11 +70,13 @@ static class AddColumn extends SchemaUpdate {
private final String parentName;
private final String name;
private final Type type;
private final Literal<?> defaultValue;

AddColumn(String parentName, String name, Type type) {
AddColumn(String parentName, String name, Type type, Literal<?> defaultValue) {
this.parentName = parentName;
this.name = name;
this.type = type;
this.defaultValue = defaultValue;
}

String parentName() {
Expand All @@ -87,6 +94,10 @@ String key() {
Type type() {
return type;
}

Literal<?> defaultValue() {
return defaultValue;
}
}

static class UpdateType extends SchemaUpdate {
Expand Down
Loading