Skip to content

Commit f126545

Browse files
committed
propagate
1 parent bc9a5d8 commit f126545

File tree

2 files changed

+113
-11
lines changed

2 files changed

+113
-11
lines changed

kernel/src/engine/arrow_expression/apply_schema.rs

Lines changed: 105 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use std::sync::Arc;
55
use itertools::Itertools;
66

77
use crate::arrow::array::{
8-
Array, ArrayRef, AsArray, ListArray, MapArray, RecordBatch, StructArray,
8+
make_array, Array, ArrayRef, AsArray, ListArray, MapArray, RecordBatch, StructArray,
99
};
10+
use crate::arrow::buffer::NullBuffer;
1011
use crate::arrow::datatypes::Schema as ArrowSchema;
1112
use crate::arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField};
1213

@@ -18,6 +19,10 @@ use crate::schema::{ArrayType, DataType, MapType, Schema, StructField};
1819
// Apply a schema to an array. The array _must_ be a `StructArray`. Returns a `RecordBatch where the
1920
// names of fields, nullable, and metadata in the struct have been transformed to match those in
2021
// schema specified by `schema`
22+
//
23+
// If the struct array has top-level nulls (rows where the entire struct is null), those nulls are
24+
// propagated to all child columns. This allows expressions like `add.stats_parsed` to return null
25+
// for rows where `add` is null, rather than erroring.
2126
pub(crate) fn apply_schema(array: &dyn Array, schema: &DataType) -> DeltaResult<RecordBatch> {
2227
let DataType::Struct(struct_schema) = schema else {
2328
return Err(Error::generic(
@@ -26,19 +31,40 @@ pub(crate) fn apply_schema(array: &dyn Array, schema: &DataType) -> DeltaResult<
2631
};
2732
let applied = apply_schema_to_struct(array, struct_schema)?;
2833
let (fields, columns, nulls) = applied.into_parts();
29-
if let Some(nulls) = nulls {
30-
if nulls.null_count() != 0 {
31-
return Err(Error::invalid_struct_data(
32-
"Top-level nulls in struct are not supported",
33-
));
34+
35+
// If there are top-level nulls, propagate them to each child column.
36+
// This handles cases where we're extracting a struct from a nullable parent
37+
// (e.g., `add.stats_parsed` where `add` can be null for non-add rows).
38+
let columns = if let Some(ref struct_nulls) = nulls {
39+
if struct_nulls.null_count() > 0 {
40+
columns
41+
.into_iter()
42+
.map(|column| propagate_nulls_to_column(&column, struct_nulls))
43+
.collect()
44+
} else {
45+
columns
3446
}
35-
}
47+
} else {
48+
columns
49+
};
50+
3651
Ok(RecordBatch::try_new(
3752
Arc::new(ArrowSchema::new(fields)),
3853
columns,
3954
)?)
4055
}
4156

57+
/// Propagate a parent null buffer to a column, combining it with the column's existing nulls.
58+
fn propagate_nulls_to_column(column: &ArrayRef, parent_nulls: &NullBuffer) -> ArrayRef {
59+
let data = column.to_data();
60+
let combined_nulls = NullBuffer::union(Some(parent_nulls), data.nulls());
61+
let builder = data.into_builder().nulls(combined_nulls);
62+
// SAFETY: We're only adding more nulls to an existing valid array.
63+
// The union can only grow the set of NULL rows, preserving data validity.
64+
let data = unsafe { builder.build_unchecked() };
65+
make_array(data)
66+
}
67+
4268
// helper to transform an arrow field+col into the specified target type. If `rename` is specified
4369
// the field will be renamed to the contained `str`.
4470
fn new_field_with_metadata(
@@ -191,6 +217,7 @@ mod apply_schema_validation_tests {
191217
use std::sync::Arc;
192218

193219
use crate::arrow::array::{Int32Array, StructArray};
220+
use crate::arrow::buffer::{BooleanBuffer, NullBuffer};
194221
use crate::arrow::datatypes::{
195222
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
196223
};
@@ -238,4 +265,75 @@ mod apply_schema_validation_tests {
238265
StructField::new("b", DataType::INTEGER, false),
239266
])
240267
}
268+
269+
/// Test that top-level struct nulls are propagated to child columns.
270+
///
271+
/// This simulates a Delta log scenario where each row is one action type (add, remove, etc.).
272+
/// When extracting `add.stats_parsed`, rows where `add` is null (e.g., remove actions) should
273+
/// return null for all child columns rather than erroring.
274+
#[test]
275+
fn test_apply_schema_propagates_top_level_nulls() {
276+
// Create a struct array with 4 rows where rows 1 and 3 have top-level nulls.
277+
// This simulates: [add_action, remove_action, add_action, remove_action]
278+
// where remove_action rows have null for the entire struct.
279+
let field_a = ArrowField::new("a", ArrowDataType::Int32, true);
280+
let field_b = ArrowField::new("b", ArrowDataType::Int32, true);
281+
let schema = ArrowSchema::new(vec![field_a, field_b]);
282+
283+
// Child column data - note these have values even for rows that will be struct-null
284+
// because Arrow stores child data independently of the struct's null buffer.
285+
let a_data = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
286+
let b_data = Int32Array::from(vec![Some(10), Some(20), Some(30), Some(40)]);
287+
288+
// Top-level nulls: rows 0 and 2 are valid, rows 1 and 3 are null
289+
let null_buffer = NullBuffer::new(BooleanBuffer::from(vec![true, false, true, false]));
290+
291+
let struct_array = StructArray::try_new(
292+
schema.fields.clone(),
293+
vec![Arc::new(a_data), Arc::new(b_data)],
294+
Some(null_buffer),
295+
)
296+
.unwrap();
297+
298+
// Target schema with nullable fields
299+
let target_schema = DataType::Struct(Box::new(StructType::new_unchecked([
300+
StructField::new("a", DataType::INTEGER, true),
301+
StructField::new("b", DataType::INTEGER, true),
302+
])));
303+
304+
// Apply schema - this should propagate top-level nulls to child columns
305+
let result = apply_schema(&struct_array, &target_schema).unwrap();
306+
307+
assert_eq!(result.num_rows(), 4);
308+
assert_eq!(result.num_columns(), 2);
309+
310+
// Verify column "a" has nulls propagated from the struct's null buffer
311+
let col_a = result.column(0);
312+
assert!(col_a.is_valid(0), "Row 0 should be valid");
313+
assert!(col_a.is_null(1), "Row 1 should be null (struct was null)");
314+
assert!(col_a.is_valid(2), "Row 2 should be valid");
315+
assert!(col_a.is_null(3), "Row 3 should be null (struct was null)");
316+
317+
// Verify column "b" has nulls propagated from the struct's null buffer
318+
let col_b = result.column(1);
319+
assert!(col_b.is_valid(0), "Row 0 should be valid");
320+
assert!(col_b.is_null(1), "Row 1 should be null (struct was null)");
321+
assert!(col_b.is_valid(2), "Row 2 should be valid");
322+
assert!(col_b.is_null(3), "Row 3 should be null (struct was null)");
323+
324+
// Verify the actual values for valid rows
325+
let col_a = col_a
326+
.as_any()
327+
.downcast_ref::<Int32Array>()
328+
.expect("column a should be Int32Array");
329+
let col_b = col_b
330+
.as_any()
331+
.downcast_ref::<Int32Array>()
332+
.expect("column b should be Int32Array");
333+
334+
assert_eq!(col_a.value(0), 1);
335+
assert_eq!(col_a.value(2), 3);
336+
assert_eq!(col_b.value(0), 10);
337+
assert_eq!(col_b.value(2), 30);
338+
}
241339
}

kernel/src/engine/arrow_expression/tests.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,8 @@ fn test_create_one_mismatching_scalar_types() {
827827

828828
#[test]
829829
fn test_create_one_not_null_struct() {
830+
// Creating a NOT NULL struct field with null values should error.
831+
// The error comes from Arrow's RecordBatch validation (non-nullable column has nulls).
830832
let values: &[Scalar] = &[
831833
Scalar::Null(KernelDataType::INTEGER),
832834
Scalar::Null(KernelDataType::INTEGER),
@@ -841,23 +843,25 @@ fn test_create_one_not_null_struct() {
841843
let handler = ArrowEvaluationHandler;
842844
assert_result_error_with_message(
843845
handler.create_one(schema, values),
844-
"Invalid struct data: Top-level nulls in struct are not supported",
846+
"Column 'a' is declared as non-nullable but contains null values",
845847
);
846848
}
847849

848850
#[test]
849851
fn test_create_one_top_level_null() {
852+
// Creating a NOT NULL field with null value should error.
853+
// The error comes from Arrow's RecordBatch validation.
850854
let values = &[Scalar::Null(KernelDataType::INTEGER)];
851855
let handler = ArrowEvaluationHandler;
852856

853857
let schema = Arc::new(StructType::new_unchecked([StructField::not_null(
854858
"col_1",
855859
KernelDataType::INTEGER,
856860
)]));
857-
assert!(matches!(
861+
assert_result_error_with_message(
858862
handler.create_one(schema, values),
859-
Err(Error::InvalidStructData(_))
860-
));
863+
"Column 'col_1' is declared as non-nullable but contains null values",
864+
);
861865
}
862866

863867
#[test]

0 commit comments

Comments
 (0)