diff --git a/kernel/src/engine/arrow_expression/apply_schema.rs b/kernel/src/engine/arrow_expression/apply_schema.rs index ac3cbf14c2..c58d239170 100644 --- a/kernel/src/engine/arrow_expression/apply_schema.rs +++ b/kernel/src/engine/arrow_expression/apply_schema.rs @@ -15,9 +15,14 @@ use crate::engine::ensure_data_types::ensure_data_types; use crate::error::{DeltaResult, Error}; use crate::schema::{ArrayType, DataType, MapType, Schema, StructField}; -// Apply a schema to an array. The array _must_ be a `StructArray`. Returns a `RecordBatch where the -// names of fields, nullable, and metadata in the struct have been transformed to match those in -// schema specified by `schema` +// Apply a schema to an array. The array _must_ be a `StructArray`. Returns a `RecordBatch` where +// the names of fields, nullable, and metadata in the struct have been transformed to match those +// in the schema specified by `schema`. +// +// Note: If the struct array has top-level nulls, the child columns are expected to already have +// those nulls propagated. Arrow's JSON reader does this automatically, and parquet data goes +// through `fix_nested_null_masks` which handles it. We decompose the struct and discard its null +// buffer since RecordBatch cannot have top-level nulls. pub(crate) fn apply_schema(array: &dyn Array, schema: &DataType) -> DeltaResult { let DataType::Struct(struct_schema) = schema else { return Err(Error::generic( @@ -25,14 +30,8 @@ pub(crate) fn apply_schema(array: &dyn Array, schema: &DataType) -> DeltaResult< )); }; let applied = apply_schema_to_struct(array, struct_schema)?; - let (fields, columns, nulls) = applied.into_parts(); - if let Some(nulls) = nulls { - if nulls.null_count() != 0 { - return Err(Error::invalid_struct_data( - "Top-level nulls in struct are not supported", - )); - } - } + let (fields, columns, _nulls) = applied.into_parts(); + Ok(RecordBatch::try_new( Arc::new(ArrowSchema::new(fields)), columns, @@ -191,6 +190,7 @@ mod apply_schema_validation_tests { use std::sync::Arc; use crate::arrow::array::{Int32Array, StructArray}; + use crate::arrow::buffer::{BooleanBuffer, NullBuffer}; use crate::arrow::datatypes::{ DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, }; @@ -238,4 +238,75 @@ mod apply_schema_validation_tests { StructField::new("b", DataType::INTEGER, false), ]) } + + /// Test that apply_schema handles structs with top-level nulls correctly. + /// + /// This simulates a Delta log scenario where each row is one action type (add, remove, etc.). + /// When extracting `add.stats_parsed`, rows where `add` is null (e.g., remove actions) should + /// have null child columns. The child columns are expected to already have nulls propagated + /// (Arrow's JSON reader does this, and parquet data goes through `fix_nested_null_masks`). + #[test] + fn test_apply_schema_handles_top_level_nulls() { + // Create a struct array with 4 rows where rows 1 and 3 have top-level nulls. + // This simulates: [add_action, remove_action, add_action, remove_action] + // where remove_action rows have null for the entire struct. + let field_a = ArrowField::new("a", ArrowDataType::Int32, true); + let field_b = ArrowField::new("b", ArrowDataType::Int32, true); + let schema = ArrowSchema::new(vec![field_a, field_b]); + + // Child columns with nulls already propagated (simulating what Arrow readers do). + // Rows 1 and 3 are null because the parent struct is null at those positions. + let a_data = Int32Array::from(vec![Some(1), None, Some(3), None]); + let b_data = Int32Array::from(vec![Some(10), None, Some(30), None]); + + // Top-level struct nulls: rows 0 and 2 are valid, rows 1 and 3 are null + let null_buffer = NullBuffer::new(BooleanBuffer::from(vec![true, false, true, false])); + + let struct_array = StructArray::try_new( + schema.fields.clone(), + vec![Arc::new(a_data), Arc::new(b_data)], + Some(null_buffer), + ) + .unwrap(); + + // Target schema with nullable fields + let target_schema = DataType::Struct(Box::new(StructType::new_unchecked([ + StructField::new("a", DataType::INTEGER, true), + StructField::new("b", DataType::INTEGER, true), + ]))); + + // Apply schema - should successfully convert to RecordBatch + let result = apply_schema(&struct_array, &target_schema).unwrap(); + + assert_eq!(result.num_rows(), 4); + assert_eq!(result.num_columns(), 2); + + // Verify columns preserve nulls from child arrays + let col_a = result.column(0); + assert!(col_a.is_valid(0), "Row 0 should be valid"); + assert!(col_a.is_null(1), "Row 1 should be null"); + assert!(col_a.is_valid(2), "Row 2 should be valid"); + assert!(col_a.is_null(3), "Row 3 should be null"); + + let col_b = result.column(1); + assert!(col_b.is_valid(0), "Row 0 should be valid"); + assert!(col_b.is_null(1), "Row 1 should be null"); + assert!(col_b.is_valid(2), "Row 2 should be valid"); + assert!(col_b.is_null(3), "Row 3 should be null"); + + // Verify the actual values for valid rows + let col_a = col_a + .as_any() + .downcast_ref::() + .expect("column a should be Int32Array"); + let col_b = col_b + .as_any() + .downcast_ref::() + .expect("column b should be Int32Array"); + + assert_eq!(col_a.value(0), 1); + assert_eq!(col_a.value(2), 3); + assert_eq!(col_b.value(0), 10); + assert_eq!(col_b.value(2), 30); + } } diff --git a/kernel/src/engine/arrow_expression/tests.rs b/kernel/src/engine/arrow_expression/tests.rs index c949933d60..f2f490daf6 100644 --- a/kernel/src/engine/arrow_expression/tests.rs +++ b/kernel/src/engine/arrow_expression/tests.rs @@ -827,6 +827,8 @@ fn test_create_one_mismatching_scalar_types() { #[test] fn test_create_one_not_null_struct() { + // Creating a NOT NULL struct field with null values should error. + // The error comes from Arrow's RecordBatch validation (non-nullable column has nulls). let values: &[Scalar] = &[ Scalar::Null(KernelDataType::INTEGER), Scalar::Null(KernelDataType::INTEGER), @@ -841,12 +843,14 @@ fn test_create_one_not_null_struct() { let handler = ArrowEvaluationHandler; assert_result_error_with_message( handler.create_one(schema, values), - "Invalid struct data: Top-level nulls in struct are not supported", + "Column 'a' is declared as non-nullable but contains null values", ); } #[test] fn test_create_one_top_level_null() { + // Creating a NOT NULL field with null value should error. + // The error comes from Arrow's RecordBatch validation. let values = &[Scalar::Null(KernelDataType::INTEGER)]; let handler = ArrowEvaluationHandler; @@ -854,10 +858,10 @@ fn test_create_one_top_level_null() { "col_1", KernelDataType::INTEGER, )])); - assert!(matches!( + assert_result_error_with_message( handler.create_one(schema, values), - Err(Error::InvalidStructData(_)) - )); + "Column 'col_1' is declared as non-nullable but contains null values", + ); } #[test]