Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@
*/
public abstract class AbstractTypeAdapter<S> implements TypeAdapter<S> {


@SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
@Override
public Object convert(Object sourceValue, S sourceSchema, Type targetType) {
public Object convert(Object sourceValue, S sourceSchema, Type targetType, StructConverter<S> structConverter) {
if (sourceValue == null) {
return null;
}
Expand Down Expand Up @@ -86,6 +87,8 @@ public Object convert(Object sourceValue, S sourceSchema, Type targetType) {
return convertList(sourceValue, sourceSchema, (Types.ListType) targetType);
case MAP:
return convertMap(sourceValue, sourceSchema, (Types.MapType) targetType);
case STRUCT:
return structConverter.convert(sourceValue, sourceSchema, targetType);
default:
return sourceValue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,26 @@ protected List<?> convertList(Object sourceValue, Schema sourceSchema, Types.Lis
}
return adaptedMap;
}

@Override
public Object convert(Object sourceValue, Schema sourceSchema, Type targetType) {
return convert(sourceValue, sourceSchema, targetType, this::convertStruct);
}

protected Object convertStruct(Object sourceValue, Schema sourceSchema, Type targetType) {
org.apache.iceberg.Schema schema = targetType.asStructType().asSchema();
org.apache.iceberg.data.GenericRecord result = org.apache.iceberg.data.GenericRecord.create(schema);
for (Types.NestedField f : schema.columns()) {
// Convert the value to the expected type
GenericRecord record = (GenericRecord) sourceValue;
Schema.Field sourceField = sourceSchema.getField(f.name());
if (sourceField == null) {
throw new IllegalStateException("Missing field '" + f.name()
+ "' in source schema: " + sourceSchema.getFullName());
}
Object fieldValue = convert(record.get(f.name()), sourceField.schema(), f.type());
result.setField(f.name(), fieldValue);
}
return result;
}
}
57 changes: 57 additions & 0 deletions core/src/main/java/kafka/automq/table/binder/FieldMapping.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2025, AutoMQ HK Limited.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.automq.table.binder;

import org.apache.avro.Schema;
import org.apache.iceberg.types.Type;

/**
* Represents the mapping between an Avro field and its corresponding Iceberg field.
* This class stores the position, key, schema, and type information needed to
* convert field values during record binding.
*/
public class FieldMapping {
private final int avroPosition;
private final String avroKey;
private final Type icebergType;
private final Schema avroSchema;

public FieldMapping(int avroPosition, String avroKey, Type icebergType, Schema avroSchema) {
this.avroPosition = avroPosition;
this.avroKey = avroKey;
this.icebergType = icebergType;
this.avroSchema = avroSchema;
}

public int avroPosition() {
return avroPosition;
}

public String avroKey() {
return avroKey;
}

public Type icebergType() {
return icebergType;
}

public Schema avroSchema() {
return avroSchema;
}
}
172 changes: 75 additions & 97 deletions core/src/main/java/kafka/automq/table/binder/RecordBinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -48,7 +49,7 @@ public class RecordBinder {
private final FieldMapping[] fieldMappings;

// Pre-computed RecordBinders for nested STRUCT fields
private final Map<String, RecordBinder> nestedStructBinders;
private final Map<Schema, RecordBinder> nestedStructBinders;

// Field count statistics for this batch
private final AtomicLong batchFieldCount;
Expand Down Expand Up @@ -78,11 +79,9 @@ public RecordBinder(org.apache.iceberg.Schema icebergSchema, Schema avroSchema,
}

// Initialize field mappings
this.fieldMappings = new FieldMapping[icebergSchema.columns().size()];
initializeFieldMappings(avroSchema);

this.fieldMappings = buildFieldMappings(avroSchema, icebergSchema);
// Pre-compute nested struct binders
this.nestedStructBinders = precomputeNestedStructBinders(typeAdapter);
this.nestedStructBinders = precomputeBindersMap(typeAdapter);
}

public RecordBinder createBinderForNewSchema(org.apache.iceberg.Schema icebergSchema, Schema avroSchema) {
Expand Down Expand Up @@ -121,8 +120,9 @@ void addFieldCount(long count) {
batchFieldCount.addAndGet(count);
}

private void initializeFieldMappings(Schema avroSchema) {
private FieldMapping[] buildFieldMappings(Schema avroSchema, org.apache.iceberg.Schema icebergSchema) {
Schema recordSchema = avroSchema;
FieldMapping[] mappings = new FieldMapping[icebergSchema.columns().size()];

if (recordSchema.getType() == Schema.Type.UNION) {
recordSchema = recordSchema.getTypes().stream()
Expand All @@ -137,32 +137,28 @@ private void initializeFieldMappings(Schema avroSchema) {

Schema.Field avroField = recordSchema.getField(fieldName);
if (avroField != null) {
fieldMappings[icebergPos] = createOptimizedMapping(
mappings[icebergPos] = buildFieldMapping(
avroField.name(),
avroField.pos(),
icebergField.type(),
avroField.schema()
);
} else {
fieldMappings[icebergPos] = null;
mappings[icebergPos] = null;
}
}
return mappings;
}

private FieldMapping createOptimizedMapping(String avroFieldName, int avroPosition, Type icebergType, Schema avroType) {
org.apache.iceberg.Schema nestedSchema = null;
String nestedSchemaId = null;
if (icebergType.isStructType()) {
nestedSchema = icebergType.asStructType().asSchema();
nestedSchemaId = icebergType.toString();
}
private FieldMapping buildFieldMapping(String avroFieldName, int avroPosition, Type icebergType, Schema avroType) {
if (Type.TypeID.TIMESTAMP.equals(icebergType.typeId())
|| Type.TypeID.TIME.equals(icebergType.typeId())
|| Type.TypeID.MAP.equals(icebergType.typeId())
|| Type.TypeID.LIST.equals(icebergType.typeId())) {
|| Type.TypeID.LIST.equals(icebergType.typeId())
|| Type.TypeID.STRUCT.equals(icebergType.typeId())) {
avroType = resolveUnionElement(avroType);
}
return new FieldMapping(avroPosition, avroFieldName, icebergType, icebergType.typeId(), avroType, nestedSchema, nestedSchemaId);
return new FieldMapping(avroPosition, avroFieldName, icebergType, avroType);
}

private Schema resolveUnionElement(Schema schema) {
Expand All @@ -183,24 +179,55 @@ private Schema resolveUnionElement(Schema schema) {
/**
* Pre-computes RecordBinders for nested STRUCT fields.
*/
private Map<String, RecordBinder> precomputeNestedStructBinders(TypeAdapter<Schema> typeAdapter) {
Map<String, RecordBinder> binders = new HashMap<>();
private Map<Schema, RecordBinder> precomputeBindersMap(TypeAdapter<Schema> typeAdapter) {
Map<Schema, RecordBinder> binders = new IdentityHashMap<>();

for (FieldMapping mapping : fieldMappings) {
if (mapping != null && mapping.typeId() == Type.TypeID.STRUCT) {
String structId = mapping.nestedSchemaId();
if (!binders.containsKey(structId)) {
RecordBinder nestedBinder = new RecordBinder(
mapping.nestedSchema(),
if (mapping != null) {
Type type = mapping.icebergType();
if (type.isPrimitiveType()) {
} else if (type.isStructType()) {
org.apache.iceberg.Schema schema = type.asStructType().asSchema();
RecordBinder structBinder = new RecordBinder(
schema,
mapping.avroSchema(),
typeAdapter,
batchFieldCount
);
binders.put(structId, nestedBinder);
binders.put(mapping.avroSchema(), structBinder);
} else if (type.isListType()) {
Types.ListType listType = type.asListType();
Type elementType = listType.elementType();
if (elementType.isStructType()) {
org.apache.iceberg.Schema schema = elementType.asStructType().asSchema();
RecordBinder elementBinder = new RecordBinder(
schema,
mapping.avroSchema().getElementType(),
typeAdapter,
batchFieldCount
);
binders.put(mapping.avroSchema().getElementType(), elementBinder);
}
} else if (type.isMapType()) {
Types.MapType mapType = type.asMapType();
Type keyType = mapType.keyType();
Type valueType = mapType.valueType();
if (keyType.isStructType()) {
throw new UnsupportedOperationException("Struct keys in MAP types are not supported");
}
if (valueType.isStructType()) {
org.apache.iceberg.Schema schema = valueType.asStructType().asSchema();
RecordBinder valueBinder = new RecordBinder(
schema,
mapping.avroSchema().getValueType(),
typeAdapter,
batchFieldCount
);
binders.put(mapping.avroSchema().getValueType(), valueBinder);
}
}
}
}

return binders;
}

Expand All @@ -210,16 +237,16 @@ private static class AvroRecordView implements Record {
private final TypeAdapter<Schema> typeAdapter;
private final Map<String, Integer> fieldNameToPosition;
private final FieldMapping[] fieldMappings;
private final Map<String, RecordBinder> nestedStructBinders;
private final Map<Schema, RecordBinder> nestedStructBinders;
private final RecordBinder parentBinder;

AvroRecordView(GenericRecord avroRecord,
org.apache.iceberg.Schema icebergSchema,
TypeAdapter<Schema> typeAdapter,
Map<String, Integer> fieldNameToPosition,
FieldMapping[] fieldMappings,
Map<String, RecordBinder> nestedStructBinders,
RecordBinder parentBinder) {
org.apache.iceberg.Schema icebergSchema,
TypeAdapter<Schema> typeAdapter,
Map<String, Integer> fieldNameToPosition,
FieldMapping[] fieldMappings,
Map<Schema, RecordBinder> nestedStructBinders,
RecordBinder parentBinder) {
this.avroRecord = avroRecord;
this.icebergSchema = icebergSchema;
this.typeAdapter = typeAdapter;
Expand All @@ -242,25 +269,11 @@ public Object get(int pos) {
if (mapping == null) {
return null;
}

Object avroValue = avroRecord.get(mapping.avroPosition());
if (avroValue == null) {
return null;
}

// Handle STRUCT type - delegate to nested binder
if (mapping.typeId() == Type.TypeID.STRUCT) {
String structId = mapping.nestedSchemaId();
RecordBinder nestedBinder = nestedStructBinders.get(structId);
if (nestedBinder == null) {
throw new IllegalStateException("Nested binder not found for struct: " + structId);
}
parentBinder.addFieldCount(1);
return nestedBinder.bind((GenericRecord) avroValue);
}

// Convert non-STRUCT types
Object result = typeAdapter.convert(avroValue, mapping.avroSchema(), mapping.icebergType());
Object result = convert(avroValue, mapping.avroSchema(), mapping.icebergType());

// Calculate and accumulate field count
long fieldCount = calculateFieldCount(result, mapping.icebergType());
Expand All @@ -269,6 +282,17 @@ public Object get(int pos) {
return result;
}

public Object convert(Object sourceValue, Schema sourceSchema, Type targetType) {
if (targetType.typeId() == Type.TypeID.STRUCT) {
RecordBinder binder = nestedStructBinders.get(sourceSchema);
if (binder == null) {
throw new IllegalStateException("Missing nested binder for schema: " + sourceSchema);
}
return binder.bind((GenericRecord) sourceValue);
}
return typeAdapter.convert(sourceValue, (Schema) sourceSchema, targetType, this::convert);
}

/**
* Calculates the field count for a converted value based on its size.
* Large fields are counted multiple times based on the size threshold.
Expand Down Expand Up @@ -358,66 +382,20 @@ public <T> T get(int pos, Class<T> javaClass) {
public void setField(String name, Object value) {
throw new UnsupportedOperationException("Read-only");
}

@Override
public Record copy() {
throw new UnsupportedOperationException("Read-only");
}

@Override
public Record copy(Map<String, Object> overwriteValues) {
throw new UnsupportedOperationException("Read-only");
}

@Override
public <T> void set(int pos, T value) {
throw new UnsupportedOperationException("Read-only");
}
}

// Field mapping structure
private static class FieldMapping {
private final int avroPosition;
private final String avroKey;
private final Type icebergType;
private final Type.TypeID typeId;
private final Schema avroSchema;
private final org.apache.iceberg.Schema nestedSchema;
private final String nestedSchemaId;

FieldMapping(int avroPosition, String avroKey, Type icebergType, Type.TypeID typeId, Schema avroSchema, org.apache.iceberg.Schema nestedSchema, String nestedSchemaId) {
this.avroPosition = avroPosition;
this.avroKey = avroKey;
this.icebergType = icebergType;
this.typeId = typeId;
this.avroSchema = avroSchema;
this.nestedSchema = nestedSchema;
this.nestedSchemaId = nestedSchemaId;
}

public int avroPosition() {
return avroPosition;
}

public String avroKey() {
return avroKey;
}

public Type icebergType() {
return icebergType;
}

public Type.TypeID typeId() {
return typeId;
}

public Schema avroSchema() {
return avroSchema;
}

public org.apache.iceberg.Schema nestedSchema() {
return nestedSchema;
}

public String nestedSchemaId() {
return nestedSchemaId;
}
}
}
Loading
Loading