@@ -13,9 +13,10 @@ use crate::{
1313} ;
1414
1515use arrow_array:: {
16- cast:: AsArray , new_null_array, Array as ArrowArray , GenericListArray , OffsetSizeTrait ,
17- RecordBatch , StringArray , StructArray ,
16+ cast:: AsArray , make_array , new_null_array, Array as ArrowArray , GenericListArray ,
17+ OffsetSizeTrait , RecordBatch , StringArray , StructArray ,
1818} ;
19+ use arrow_buffer:: NullBuffer ;
1920use arrow_json:: { LineDelimitedWriter , ReaderBuilder } ;
2021use arrow_schema:: {
2122 DataType as ArrowDataType , Field as ArrowField , FieldRef as ArrowFieldRef , Fields ,
@@ -62,6 +63,21 @@ pub(crate) fn make_arrow_error(s: impl Into<String>) -> Error {
6263 Error :: Arrow ( arrow_schema:: ArrowError :: InvalidArgumentError ( s. into ( ) ) ) . with_backtrace ( )
6364}
6465
66+ /// Applies post-processing to data read from parquet files. This includes `reorder_struct_array` to
67+ /// ensure schema compatibility, as well as `fix_nested_null_masks` to ensure that leaf columns have
68+ /// accurate null masks that row visitors rely on for correctness.
69+ pub ( crate ) fn fixup_parquet_read < T > (
70+ batch : RecordBatch ,
71+ requested_ordering : & [ ReorderIndex ] ,
72+ ) -> DeltaResult < T >
73+ where
74+ StructArray : Into < T > ,
75+ {
76+ let data = reorder_struct_array ( batch. into ( ) , requested_ordering) ?;
77+ let data = fix_nested_null_masks ( data) ;
78+ Ok ( data. into ( ) )
79+ }
80+
6581/*
6682* The code below implements proper pruning of columns when reading parquet, reordering of columns to
6783* match the specified schema, and insertion of null columns if the requested schema includes a
@@ -609,6 +625,53 @@ fn reorder_list<O: OffsetSizeTrait>(
609625 }
610626}
611627
628+ /// Use this function to recursively compute properly unioned null masks for all nested
629+ /// columns of a record batch, making it safe to project out and consume nested columns.
630+ ///
631+ /// Arrow does not guarantee that the null masks associated with nested columns are accurate --
632+ /// instead, the reader must consult the union of logical null masks the column and all
633+ /// ancestors. The parquet reader stopped doing this automatically as of arrow-53.3, for example.
634+ pub fn fix_nested_null_masks ( batch : StructArray ) -> StructArray {
635+ compute_nested_null_masks ( batch, None )
636+ }
637+
638+ /// Splits a StructArray into its parts, unions in the parent null mask, and uses the result to
639+ /// recursively update the children as well before putting everything back together.
640+ fn compute_nested_null_masks ( sa : StructArray , parent_nulls : Option < & NullBuffer > ) -> StructArray {
641+ let ( fields, columns, nulls) = sa. into_parts ( ) ;
642+ let nulls = NullBuffer :: union ( parent_nulls, nulls. as_ref ( ) ) ;
643+ let columns = columns
644+ . into_iter ( )
645+ . map ( |column| match column. as_struct_opt ( ) {
646+ Some ( sa) => Arc :: new ( compute_nested_null_masks ( sa. clone ( ) , nulls. as_ref ( ) ) ) as _ ,
647+ None => {
648+ let data = column. to_data ( ) ;
649+ let nulls = NullBuffer :: union ( nulls. as_ref ( ) , data. nulls ( ) ) ;
650+ let builder = data. into_builder ( ) . nulls ( nulls) ;
651+ // Use an unchecked build to avoid paying a redundant O(k) validation cost for a
652+ // `RecordBatch` with k leaf columns.
653+ //
654+ // SAFETY: The builder was constructed from an `ArrayData` we extracted from the
655+ // column. The change we make is the null buffer, via `NullBuffer::union` with input
656+ // null buffers that were _also_ extracted from the column and its parent. A union
657+ // can only _grow_ the set of NULL rows, so data validity is preserved. Even if the
658+ // `parent_nulls` somehow had a length mismatch --- which it never should, having
659+ // also been extracted from our grandparent --- the mismatch would have already
660+ // caused `NullBuffer::union` to panic.
661+ let data = unsafe { builder. build_unchecked ( ) } ;
662+ make_array ( data)
663+ }
664+ } )
665+ . collect ( ) ;
666+
667+ // Use an unchecked constructor to avoid paying O(n*k) a redundant null buffer validation cost
668+ // for a `RecordBatch` with n rows and k leaf columns.
669+ //
670+ // SAFETY: We are simply reassembling the input `StructArray` we previously broke apart, with
671+ // updated null buffers. See above for details about null buffer safety.
672+ unsafe { StructArray :: new_unchecked ( fields, columns, nulls) }
673+ }
674+
612675/// Arrow lacks the functionality to json-parse a string column into a struct column -- even tho the
613676/// JSON file reader does exactly the same thing. This function is a hack to work around that gap.
614677pub ( crate ) fn parse_json (
@@ -1432,4 +1495,107 @@ mod tests {
14321495 ) ;
14331496 Ok ( ( ) )
14341497 }
1498+
1499+ #[ test]
1500+ fn test_arrow_broken_nested_null_masks ( ) {
1501+ use crate :: engine:: arrow_utils:: fix_nested_null_masks;
1502+ use arrow:: datatypes:: { DataType , Field , Fields , Schema } ;
1503+ use parquet:: arrow:: arrow_reader:: ParquetRecordBatchReaderBuilder ;
1504+
1505+ // Parse some JSON into a nested schema
1506+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
1507+ "outer" ,
1508+ DataType :: Struct ( Fields :: from( vec![
1509+ Field :: new(
1510+ "inner_nullable" ,
1511+ DataType :: Struct ( Fields :: from( vec![
1512+ Field :: new( "leaf_non_null" , DataType :: Int32 , false ) ,
1513+ Field :: new( "leaf_nullable" , DataType :: Int32 , true ) ,
1514+ ] ) ) ,
1515+ true ,
1516+ ) ,
1517+ Field :: new(
1518+ "inner_non_null" ,
1519+ DataType :: Struct ( Fields :: from( vec![
1520+ Field :: new( "leaf_non_null" , DataType :: Int32 , false ) ,
1521+ Field :: new( "leaf_nullable" , DataType :: Int32 , true ) ,
1522+ ] ) ) ,
1523+ false ,
1524+ ) ,
1525+ ] ) ) ,
1526+ true ,
1527+ ) ] ) ) ;
1528+ let json_string = r#"
1529+ { }
1530+ { "outer" : { "inner_non_null" : { "leaf_non_null" : 1 } } }
1531+ { "outer" : { "inner_non_null" : { "leaf_non_null" : 2, "leaf_nullable" : 3 } } }
1532+ { "outer" : { "inner_non_null" : { "leaf_non_null" : 4 }, "inner_nullable" : { "leaf_non_null" : 5 } } }
1533+ { "outer" : { "inner_non_null" : { "leaf_non_null" : 6 }, "inner_nullable" : { "leaf_non_null" : 7, "leaf_nullable": 8 } } }
1534+ "# ;
1535+ let batch1 = arrow:: json:: ReaderBuilder :: new ( schema. clone ( ) )
1536+ . build ( json_string. as_bytes ( ) )
1537+ . unwrap ( )
1538+ . next ( )
1539+ . unwrap ( )
1540+ . unwrap ( ) ;
1541+ println ! ( "Batch 1: {batch1:?}" ) ;
1542+
1543+ macro_rules! assert_nulls {
1544+ ( $column: expr, $nulls: expr ) => {
1545+ assert_eq!( $column. nulls( ) . unwrap( ) , & NullBuffer :: from( & $nulls[ ..] ) ) ;
1546+ } ;
1547+ }
1548+
1549+ // If any of these tests ever fail, it means the arrow JSON reader started producing
1550+ // incomplete nested NULL masks. If that happens, we need to update all JSON reads to call
1551+ // `fix_nested_null_masks`.
1552+ let outer_1 = batch1. column ( 0 ) . as_struct ( ) ;
1553+ assert_nulls ! ( outer_1, [ false , true , true , true , true ] ) ;
1554+ let inner_nullable_1 = outer_1. column ( 0 ) . as_struct ( ) ;
1555+ assert_nulls ! ( inner_nullable_1, [ false , false , false , true , true ] ) ;
1556+ let nullable_leaf_non_null_1 = inner_nullable_1. column ( 0 ) ;
1557+ assert_nulls ! ( nullable_leaf_non_null_1, [ false , false , false , true , true ] ) ;
1558+ let nullable_leaf_nullable_1 = inner_nullable_1. column ( 1 ) ;
1559+ assert_nulls ! ( nullable_leaf_nullable_1, [ false , false , false , false , true ] ) ;
1560+ let inner_non_null_1 = outer_1. column ( 1 ) . as_struct ( ) ;
1561+ assert_nulls ! ( inner_non_null_1, [ false , true , true , true , true ] ) ;
1562+ let non_null_leaf_non_null_1 = inner_non_null_1. column ( 0 ) ;
1563+ assert_nulls ! ( non_null_leaf_non_null_1, [ false , true , true , true , true ] ) ;
1564+ let non_null_leaf_nullable_1 = inner_non_null_1. column ( 1 ) ;
1565+ assert_nulls ! ( non_null_leaf_nullable_1, [ false , false , true , false , false ] ) ;
1566+
1567+ // Write the batch to a parquet file and read it back
1568+ let mut buffer = vec ! [ ] ;
1569+ let mut writer =
1570+ parquet:: arrow:: ArrowWriter :: try_new ( & mut buffer, schema. clone ( ) , None ) . unwrap ( ) ;
1571+ writer. write ( & batch1) . unwrap ( ) ;
1572+ writer. close ( ) . unwrap ( ) ; // writer must be closed to write footer
1573+ let batch2 = ParquetRecordBatchReaderBuilder :: try_new ( bytes:: Bytes :: from ( buffer) )
1574+ . unwrap ( )
1575+ . build ( )
1576+ . unwrap ( )
1577+ . next ( )
1578+ . unwrap ( )
1579+ . unwrap ( ) ;
1580+ println ! ( "Batch 2 before: {batch2:?}" ) ;
1581+
1582+ // Starting from arrow-53.3, the parquet reader started returning broken nested NULL masks.
1583+ let batch2 = RecordBatch :: from ( fix_nested_null_masks ( batch2. into ( ) ) ) ;
1584+
1585+ // Verify the data survived the round trip
1586+ let outer_2 = batch2. column ( 0 ) . as_struct ( ) ;
1587+ assert_eq ! ( outer_2, outer_1) ;
1588+ let inner_nullable_2 = outer_2. column ( 0 ) . as_struct ( ) ;
1589+ assert_eq ! ( inner_nullable_2, inner_nullable_1) ;
1590+ let nullable_leaf_non_null_2 = inner_nullable_2. column ( 0 ) ;
1591+ assert_eq ! ( nullable_leaf_non_null_2, nullable_leaf_non_null_1) ;
1592+ let nullable_leaf_nullable_2 = inner_nullable_2. column ( 1 ) ;
1593+ assert_eq ! ( nullable_leaf_nullable_2, nullable_leaf_nullable_1) ;
1594+ let inner_non_null_2 = outer_2. column ( 1 ) . as_struct ( ) ;
1595+ assert_eq ! ( inner_non_null_2, inner_non_null_1) ;
1596+ let non_null_leaf_non_null_2 = inner_non_null_2. column ( 0 ) ;
1597+ assert_eq ! ( non_null_leaf_non_null_2, non_null_leaf_non_null_1) ;
1598+ let non_null_leaf_nullable_2 = inner_non_null_2. column ( 1 ) ;
1599+ assert_eq ! ( non_null_leaf_nullable_2, non_null_leaf_nullable_1) ;
1600+ }
14351601}
0 commit comments