Skip to content

Commit 03c9f4e

Browse files
committed
Fix field ID conflicts in RecordBatchTransformer.
1 parent d3d3127 commit 03c9f4e

File tree

1 file changed

+171
-24
lines changed

1 file changed

+171
-24
lines changed

crates/iceberg/src/arrow/record_batch_transformer.rs

Lines changed: 171 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -274,36 +274,41 @@ impl RecordBatchTransformer {
274274
let field_id_to_source_schema_map =
275275
Self::build_field_id_to_arrow_schema_map(source_schema)?;
276276

277+
// Check if partition columns (initial_default) have field IDs that also exist in Parquet.
278+
// This means the same field ID maps to different columns, requiring name-based mapping.
279+
let has_field_id_conflict = projected_iceberg_field_ids.iter().any(|field_id| {
280+
if let Some(iceberg_field) = snapshot_schema.field_by_id(*field_id) {
281+
// If this field has initial_default (partition column) and its field ID exists in Parquet
282+
iceberg_field.initial_default.is_some()
283+
&& field_id_to_source_schema_map.contains_key(field_id)
284+
} else {
285+
false
286+
}
287+
});
288+
289+
// Build name-based mapping if there's a field ID conflict
290+
let name_to_source_schema_map: HashMap<String, (FieldRef, usize)> = source_schema
291+
.fields()
292+
.iter()
293+
.enumerate()
294+
.map(|(idx, field)| (field.name().clone(), (field.clone(), idx)))
295+
.collect();
296+
277297
projected_iceberg_field_ids.iter().map(|field_id|{
278298
let (target_field, _) = field_id_to_mapped_schema_map.get(field_id).ok_or(
279299
Error::new(ErrorKind::Unexpected, "could not find field in schema")
280300
)?;
281301
let target_type = target_field.data_type();
282302

283-
Ok(if let Some((source_field, source_index)) = field_id_to_source_schema_map.get(field_id) {
284-
// column present in source
303+
// Partition columns (initial_default) use constant values, not Parquet data.
304+
// See Iceberg Java: BaseParquetReaders.java PartitionUtil.constantsMap().
305+
let iceberg_field = snapshot_schema.field_by_id(*field_id).ok_or(
306+
Error::new(ErrorKind::Unexpected, "Field not found in snapshot schema")
307+
)?;
285308

286-
if source_field.data_type().equals_datatype(target_type) {
287-
// no promotion required
288-
ColumnSource::PassThrough {
289-
source_index: *source_index
290-
}
291-
} else {
292-
// promotion required
293-
ColumnSource::Promote {
294-
target_type: target_type.clone(),
295-
source_index: *source_index,
296-
}
297-
}
298-
} else {
299-
// column must be added
300-
let iceberg_field = snapshot_schema.field_by_id(*field_id).ok_or(
301-
Error::new(ErrorKind::Unexpected, "Field not found in snapshot schema")
302-
)?;
303-
304-
let default_value = if let Some(iceberg_default_value) =
305-
&iceberg_field.initial_default
306-
{
309+
let column_source = if iceberg_field.initial_default.is_some() {
310+
// This is a partition column - use the constant value, don't read from file
311+
let default_value = if let Some(iceberg_default_value) = &iceberg_field.initial_default {
307312
let Literal::Primitive(primitive_literal) = iceberg_default_value else {
308313
return Err(Error::new(
309314
ErrorKind::Unexpected,
@@ -319,7 +324,54 @@ impl RecordBatchTransformer {
319324
value: default_value,
320325
target_type: target_type.clone(),
321326
}
322-
})
327+
} else {
328+
// For data columns (no initial_default), check if we need to use name-based or field ID-based mapping
329+
let source_info = if has_field_id_conflict {
330+
// Use name-based mapping when partition columns conflict with Parquet field IDs
331+
name_to_source_schema_map.get(iceberg_field.name.as_str())
332+
} else {
333+
// Use field ID-based mapping (normal case)
334+
field_id_to_source_schema_map.get(field_id)
335+
};
336+
337+
if let Some((source_field, source_index)) = source_info {
338+
// column present in source
339+
if source_field.data_type().equals_datatype(target_type) {
340+
// no promotion required
341+
ColumnSource::PassThrough {
342+
source_index: *source_index
343+
}
344+
} else {
345+
// promotion required
346+
ColumnSource::Promote {
347+
target_type: target_type.clone(),
348+
source_index: *source_index,
349+
}
350+
}
351+
} else {
352+
// column must be added (schema evolution case)
353+
let default_value = if let Some(iceberg_default_value) =
354+
&iceberg_field.initial_default
355+
{
356+
let Literal::Primitive(primitive_literal) = iceberg_default_value else {
357+
return Err(Error::new(
358+
ErrorKind::Unexpected,
359+
format!("Default value for column must be primitive type, but encountered {:?}", iceberg_default_value)
360+
));
361+
};
362+
Some(primitive_literal.clone())
363+
} else {
364+
None
365+
};
366+
367+
ColumnSource::Add {
368+
value: default_value,
369+
target_type: target_type.clone(),
370+
}
371+
}
372+
};
373+
374+
Ok(column_source)
323375
}).collect()
324376
}
325377

@@ -696,4 +748,99 @@ mod test {
696748
value.to_string(),
697749
)]))
698750
}
751+
752+
/// Test for add_files partition column handling with field ID conflicts.
753+
///
754+
/// This reproduces the scenario from Iceberg Java's TestAddFilesProcedure where:
755+
/// - Hive-style partitioned Parquet files are imported via add_files procedure
756+
/// - Parquet files have field IDs: name (1), subdept (2)
757+
/// - Iceberg schema assigns different field IDs: id (1), name (2), dept (3), subdept (4)
758+
/// - Partition columns (id, dept) have initial_default values from manifests
759+
///
760+
/// Without proper handling, this would incorrectly:
761+
/// 1. Try to read partition column "id" (field_id=1) from Parquet field_id=1 ("name")
762+
/// 2. Read data column "name" (field_id=2) from Parquet field_id=2 ("subdept")
763+
///
764+
/// The fix ensures:
765+
/// 1. Partition columns with initial_default are ALWAYS read as constants (never from Parquet)
766+
/// 2. Data columns use name-based mapping when field ID conflicts are detected
767+
///
768+
/// See: Iceberg Java TestAddFilesProcedure.addDataPartitionedByIdAndDept()
769+
#[test]
770+
fn add_files_partition_columns_with_field_id_conflict() {
771+
// Iceberg schema after add_files: id (partition), name, dept (partition), subdept
772+
let snapshot_schema = Arc::new(
773+
Schema::builder()
774+
.with_schema_id(0)
775+
.with_fields(vec![
776+
NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int))
777+
.with_initial_default(Literal::int(1))
778+
.into(),
779+
NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
780+
NestedField::optional(3, "dept", Type::Primitive(PrimitiveType::String))
781+
.with_initial_default(Literal::string("hr"))
782+
.into(),
783+
NestedField::optional(4, "subdept", Type::Primitive(PrimitiveType::String))
784+
.into(),
785+
])
786+
.build()
787+
.unwrap(),
788+
);
789+
790+
// Parquet file schema: name (field_id=1), subdept (field_id=2)
791+
// Note: Partition columns (id, dept) are NOT in the Parquet file - they're in directory paths
792+
let parquet_schema = Arc::new(ArrowSchema::new(vec![
793+
simple_field("name", DataType::Utf8, true, "1"),
794+
simple_field("subdept", DataType::Utf8, true, "2"),
795+
]));
796+
797+
let projected_field_ids = [1, 2, 3, 4]; // id, name, dept, subdept
798+
799+
let mut transformer = RecordBatchTransformer::build(snapshot_schema, &projected_field_ids);
800+
801+
// Create a Parquet RecordBatch with data for: name="John Doe", subdept="communications"
802+
let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
803+
Arc::new(StringArray::from(vec!["John Doe"])),
804+
Arc::new(StringArray::from(vec!["communications"])),
805+
])
806+
.unwrap();
807+
808+
let result = transformer.process_record_batch(parquet_batch).unwrap();
809+
810+
// Verify the transformed RecordBatch has:
811+
// - id=1 (from initial_default, not from Parquet)
812+
// - name="John Doe" (from Parquet, matched by name despite field ID conflict)
813+
// - dept="hr" (from initial_default, not from Parquet)
814+
// - subdept="communications" (from Parquet, matched by name)
815+
assert_eq!(result.num_columns(), 4);
816+
assert_eq!(result.num_rows(), 1);
817+
818+
let id_column = result
819+
.column(0)
820+
.as_any()
821+
.downcast_ref::<Int32Array>()
822+
.unwrap();
823+
assert_eq!(id_column.value(0), 1);
824+
825+
let name_column = result
826+
.column(1)
827+
.as_any()
828+
.downcast_ref::<StringArray>()
829+
.unwrap();
830+
assert_eq!(name_column.value(0), "John Doe");
831+
832+
let dept_column = result
833+
.column(2)
834+
.as_any()
835+
.downcast_ref::<StringArray>()
836+
.unwrap();
837+
assert_eq!(dept_column.value(0), "hr");
838+
839+
let subdept_column = result
840+
.column(3)
841+
.as_any()
842+
.downcast_ref::<StringArray>()
843+
.unwrap();
844+
assert_eq!(subdept_column.value(0), "communications");
845+
}
699846
}

0 commit comments

Comments
 (0)