Skip to content

Commit 7adc770

Browse files
committed
Address PR feedback.
1 parent d0ec778 commit 7adc770

File tree

2 files changed

+42
-27
lines changed

2 files changed

+42
-27
lines changed

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,7 @@ mod tests {
534534
use std::fs::File;
535535
use std::sync::Arc;
536536

537+
use arrow_array::cast::AsArray;
537538
use arrow_array::{ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray, StructArray};
538539
use arrow_schema::{DataType, Field, Fields};
539540
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
@@ -685,12 +686,13 @@ mod tests {
685686
assert!(result.is_none()); // no pos dels for file 3
686687
}
687688

688-
/// Verifies that evolve_schema on partial-schema equality deletes fails with Arrow
689-
/// validation errors when missing REQUIRED columns are filled with NULLs.
689+
/// Verifies that evolve_schema on partial-schema equality deletes works correctly
690+
/// when only equality_ids columns are evolved, not all table columns.
690691
///
691-
/// Reproduces the issue that caused 14 TestSparkReaderDeletes failures in Iceberg Java.
692+
/// Per the [Iceberg spec](https://iceberg.apache.org/spec/#equality-delete-files),
693+
/// equality delete files can contain only a subset of columns.
692694
#[tokio::test]
693-
async fn test_partial_schema_equality_deletes_evolve_fails() {
695+
async fn test_partial_schema_equality_deletes_evolve_succeeds() {
694696
let tmp_dir = TempDir::new().unwrap();
695697
let table_location = tmp_dir.path().as_os_str().to_str().unwrap();
696698

@@ -750,23 +752,32 @@ mod tests {
750752
.await
751753
.unwrap();
752754

753-
let mut evolved_stream = BasicDeleteFileLoader::evolve_schema(batch_stream, table_schema)
754-
.await
755-
.unwrap();
755+
// Only evolve the equality_ids columns (field 2), not all table columns
756+
let equality_ids = vec![2];
757+
let evolved_stream =
758+
BasicDeleteFileLoader::evolve_schema(batch_stream, table_schema, &equality_ids)
759+
.await
760+
.unwrap();
756761

757-
let result = evolved_stream.next().await.unwrap();
762+
let result = evolved_stream.try_collect::<Vec<_>>().await;
758763

759764
assert!(
760-
result.is_err(),
761-
"Expected error from evolve_schema adding NULL to non-nullable column"
765+
result.is_ok(),
766+
"Expected success when evolving only equality_ids columns, got error: {:?}",
767+
result.err()
762768
);
763769

764-
let err = result.unwrap_err();
765-
let err_msg = err.to_string();
766-
assert!(
767-
err_msg.contains("non-nullable") || err_msg.contains("null values"),
768-
"Expected null value error, got: {}",
769-
err_msg
770-
);
770+
let batches = result.unwrap();
771+
assert_eq!(batches.len(), 1);
772+
773+
let batch = &batches[0];
774+
assert_eq!(batch.num_rows(), 3);
775+
assert_eq!(batch.num_columns(), 1); // Only 'data' column
776+
777+
// Verify the actual values are preserved after schema evolution
778+
let data_col = batch.column(0).as_string::<i32>();
779+
assert_eq!(data_col.value(0), "a");
780+
assert_eq!(data_col.value(1), "d");
781+
assert_eq!(data_col.value(2), "g");
771782
}
772783
}

crates/iceberg/src/arrow/delete_file_loader.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,20 +71,17 @@ impl BasicDeleteFileLoader {
7171
Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
7272
}
7373

74-
/// Evolves the schema of the RecordBatches from an equality delete file
74+
/// Evolves the schema of the RecordBatches from an equality delete file.
75+
///
76+
/// Per the [Iceberg spec](https://iceberg.apache.org/spec/#equality-delete-files),
77+
/// only evolves the specified `equality_ids` columns, not all table columns.
7578
pub(crate) async fn evolve_schema(
7679
record_batch_stream: ArrowRecordBatchStream,
7780
target_schema: Arc<Schema>,
81+
equality_ids: &[i32],
7882
) -> Result<ArrowRecordBatchStream> {
79-
let eq_ids = target_schema
80-
.as_ref()
81-
.field_id_to_name_map()
82-
.keys()
83-
.cloned()
84-
.collect::<Vec<_>>();
85-
8683
let mut record_batch_transformer =
87-
RecordBatchTransformer::build(target_schema.clone(), &eq_ids);
84+
RecordBatchTransformer::build(target_schema.clone(), equality_ids);
8885

8986
let record_batch_stream = record_batch_stream.map(move |record_batch| {
9087
record_batch.and_then(|record_batch| {
@@ -105,7 +102,14 @@ impl DeleteFileLoader for BasicDeleteFileLoader {
105102
) -> Result<ArrowRecordBatchStream> {
106103
let raw_batch_stream = self.parquet_to_batch_stream(&task.file_path).await?;
107104

108-
Self::evolve_schema(raw_batch_stream, schema).await
105+
// For equality deletes, only evolve the equality_ids columns.
106+
// For positional deletes (equality_ids is None), use all field IDs.
107+
let field_ids = match &task.equality_ids {
108+
Some(ids) => ids.clone(),
109+
None => schema.field_id_to_name_map().keys().cloned().collect(),
110+
};
111+
112+
Self::evolve_schema(raw_batch_stream, schema, &field_ids).await
109113
}
110114
}
111115

0 commit comments

Comments
 (0)