Skip to content

Commit d85d675

Browse files
committed
Disambiguate partition values.
1 parent 03c9f4e commit d85d675

File tree

1 file changed

+17
-5
lines changed

1 file changed

+17
-5
lines changed

crates/iceberg/src/arrow/record_batch_transformer.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -274,8 +274,8 @@ impl RecordBatchTransformer {
274274
let field_id_to_source_schema_map =
275275
Self::build_field_id_to_arrow_schema_map(source_schema)?;
276276

277-
// Check if partition columns (initial_default) have field IDs that also exist in Parquet.
278-
// This means the same field ID maps to different columns, requiring name-based mapping.
277+
// Check if partition columns (initial_default) conflict with Parquet field IDs.
278+
// If so, use name-based mapping for data columns (common in add_files imports).
279279
let has_field_id_conflict = projected_iceberg_field_ids.iter().any(|field_id| {
280280
if let Some(iceberg_field) = snapshot_schema.field_by_id(*field_id) {
281281
// If this field has initial_default (partition column) and its field ID exists in Parquet
@@ -300,13 +300,25 @@ impl RecordBatchTransformer {
300300
)?;
301301
let target_type = target_field.data_type();
302302

303-
// Partition columns (initial_default) use constant values, not Parquet data.
304-
// See Iceberg Java: BaseParquetReaders.java PartitionUtil.constantsMap().
305303
let iceberg_field = snapshot_schema.field_by_id(*field_id).ok_or(
306304
Error::new(ErrorKind::Unexpected, "Field not found in snapshot schema")
307305
)?;
308306

309-
let column_source = if iceberg_field.initial_default.is_some() {
307+
// Check if this field is present in the Parquet file
308+
let is_in_parquet = if has_field_id_conflict {
309+
name_to_source_schema_map.contains_key(iceberg_field.name.as_str())
310+
} else {
311+
field_id_to_source_schema_map.contains_key(field_id)
312+
};
313+
314+
// Fields with initial_default that are NOT in Parquet are partition columns
315+
// (common in add_files scenario where partition columns are in directory paths).
316+
// Per spec (Scan Planning: "Return value from partition metadata for Identity transforms").
317+
// See Iceberg Java: BaseParquetReaders.java uses PartitionUtil.constantsMap() for this.
318+
//
319+
// Fields with initial_default that ARE in Parquet should be read normally
320+
// (e.g., source columns for bucket partitioning like 'id' in PARTITIONED BY bucket(4, id)).
321+
let column_source = if iceberg_field.initial_default.is_some() && !is_in_parquet {
310322
// This is a partition column - use the constant value, don't read from file
311323
let default_value = if let Some(iceberg_default_value) = &iceberg_field.initial_default {
312324
let Literal::Primitive(primitive_literal) = iceberg_default_value else {

0 commit comments

Comments
 (0)