Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 82 additions & 11 deletions kernel/src/engine/arrow_expression/apply_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,23 @@ use crate::engine::ensure_data_types::ensure_data_types;
use crate::error::{DeltaResult, Error};
use crate::schema::{ArrayType, DataType, MapType, Schema, StructField};

// Apply a schema to an array. The array _must_ be a `StructArray`. Returns a `RecordBatch where the
// names of fields, nullable, and metadata in the struct have been transformed to match those in
// schema specified by `schema`
// Apply a schema to an array. The array _must_ be a `StructArray`. Returns a `RecordBatch` where
// the names of fields, nullable, and metadata in the struct have been transformed to match those
// in the schema specified by `schema`.
//
// Note: If the struct array has top-level nulls, the child columns are expected to already have
// those nulls propagated. Arrow's JSON reader does this automatically, and parquet data goes
// through `fix_nested_null_masks` which handles it. We decompose the struct and discard its null
// buffer since RecordBatch cannot have top-level nulls.
pub(crate) fn apply_schema(array: &dyn Array, schema: &DataType) -> DeltaResult<RecordBatch> {
let DataType::Struct(struct_schema) = schema else {
return Err(Error::generic(
"apply_schema at top-level must be passed a struct schema",
));
};
let applied = apply_schema_to_struct(array, struct_schema)?;
let (fields, columns, nulls) = applied.into_parts();
if let Some(nulls) = nulls {
if nulls.null_count() != 0 {
return Err(Error::invalid_struct_data(
"Top-level nulls in struct are not supported",
));
}
}
let (fields, columns, _nulls) = applied.into_parts();

Ok(RecordBatch::try_new(
Arc::new(ArrowSchema::new(fields)),
columns,
Expand Down Expand Up @@ -191,6 +190,7 @@ mod apply_schema_validation_tests {
use std::sync::Arc;

use crate::arrow::array::{Int32Array, StructArray};
use crate::arrow::buffer::{BooleanBuffer, NullBuffer};
use crate::arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
};
Expand Down Expand Up @@ -238,4 +238,75 @@ mod apply_schema_validation_tests {
StructField::new("b", DataType::INTEGER, false),
])
}

/// Test that apply_schema handles structs with top-level nulls correctly.
///
/// This simulates a Delta log scenario where each row is one action type (add, remove, etc.).
/// When extracting `add.stats_parsed`, rows where `add` is null (e.g., remove actions) should
/// have null child columns. The child columns are expected to already have nulls propagated
/// (Arrow's JSON reader does this, and parquet data goes through `fix_nested_null_masks`).
#[test]
fn test_apply_schema_handles_top_level_nulls() {
// Create a struct array with 4 rows where rows 1 and 3 have top-level nulls.
// This simulates: [add_action, remove_action, add_action, remove_action]
// where remove_action rows have null for the entire struct.
let field_a = ArrowField::new("a", ArrowDataType::Int32, true);
let field_b = ArrowField::new("b", ArrowDataType::Int32, true);
let schema = ArrowSchema::new(vec![field_a, field_b]);

// Child columns with nulls already propagated (simulating what Arrow readers do).
// Rows 1 and 3 are null because the parent struct is null at those positions.
let a_data = Int32Array::from(vec![Some(1), None, Some(3), None]);
let b_data = Int32Array::from(vec![Some(10), None, Some(30), None]);

// Top-level struct nulls: rows 0 and 2 are valid, rows 1 and 3 are null
let null_buffer = NullBuffer::new(BooleanBuffer::from(vec![true, false, true, false]));

let struct_array = StructArray::try_new(
schema.fields.clone(),
vec![Arc::new(a_data), Arc::new(b_data)],
Some(null_buffer),
)
.unwrap();

// Target schema with nullable fields
let target_schema = DataType::Struct(Box::new(StructType::new_unchecked([
StructField::new("a", DataType::INTEGER, true),
StructField::new("b", DataType::INTEGER, true),
])));

// Apply schema - should successfully convert to RecordBatch
let result = apply_schema(&struct_array, &target_schema).unwrap();

assert_eq!(result.num_rows(), 4);
assert_eq!(result.num_columns(), 2);

// Verify columns preserve nulls from child arrays
let col_a = result.column(0);
assert!(col_a.is_valid(0), "Row 0 should be valid");
assert!(col_a.is_null(1), "Row 1 should be null");
assert!(col_a.is_valid(2), "Row 2 should be valid");
assert!(col_a.is_null(3), "Row 3 should be null");

let col_b = result.column(1);
assert!(col_b.is_valid(0), "Row 0 should be valid");
assert!(col_b.is_null(1), "Row 1 should be null");
assert!(col_b.is_valid(2), "Row 2 should be valid");
assert!(col_b.is_null(3), "Row 3 should be null");

// Verify the actual values for valid rows
let col_a = col_a
.as_any()
.downcast_ref::<Int32Array>()
.expect("column a should be Int32Array");
let col_b = col_b
.as_any()
.downcast_ref::<Int32Array>()
.expect("column b should be Int32Array");

assert_eq!(col_a.value(0), 1);
assert_eq!(col_a.value(2), 3);
assert_eq!(col_b.value(0), 10);
assert_eq!(col_b.value(2), 30);
}
}
12 changes: 8 additions & 4 deletions kernel/src/engine/arrow_expression/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,8 @@ fn test_create_one_mismatching_scalar_types() {

#[test]
fn test_create_one_not_null_struct() {
// Creating a NOT NULL struct field with null values should error.
// The error comes from Arrow's RecordBatch validation (non-nullable column has nulls).
let values: &[Scalar] = &[
Scalar::Null(KernelDataType::INTEGER),
Scalar::Null(KernelDataType::INTEGER),
Expand All @@ -841,23 +843,25 @@ fn test_create_one_not_null_struct() {
let handler = ArrowEvaluationHandler;
assert_result_error_with_message(
handler.create_one(schema, values),
"Invalid struct data: Top-level nulls in struct are not supported",
"Column 'a' is declared as non-nullable but contains null values",
);
}

#[test]
fn test_create_one_top_level_null() {
// Creating a NOT NULL field with null value should error.
// The error comes from Arrow's RecordBatch validation.
let values = &[Scalar::Null(KernelDataType::INTEGER)];
let handler = ArrowEvaluationHandler;

let schema = Arc::new(StructType::new_unchecked([StructField::not_null(
"col_1",
KernelDataType::INTEGER,
)]));
assert!(matches!(
assert_result_error_with_message(
handler.create_one(schema, values),
Err(Error::InvalidStructData(_))
));
"Column 'col_1' is declared as non-nullable but contains null values",
);
}

#[test]
Expand Down
Loading