@@ -224,12 +224,14 @@ impl CachingDeleteFileLoader {
224224                let  ( sender,  receiver)  = channel ( ) ; 
225225                del_filter. insert_equality_delete ( & task. file_path ,  receiver) ; 
226226
227-                 // Equality deletes intentionally have partial schemas. Schema evolution would add 
228-                 // NULL values for missing REQUIRED columns, causing Arrow validation to fail. 
229227                Ok ( DeleteFileContext :: FreshEqDel  { 
230-                     batch_stream :  basic_delete_file_loader
231-                         . parquet_to_batch_stream ( & task. file_path ) 
232-                         . await ?, 
228+                     batch_stream :  BasicDeleteFileLoader :: evolve_schema ( 
229+                         basic_delete_file_loader
230+                             . parquet_to_batch_stream ( & task. file_path ) 
231+                             . await ?, 
232+                         schema, 
233+                     ) 
234+                     . await ?, 
233235                    sender, 
234236                    equality_ids :  HashSet :: from_iter ( task. equality_ids . clone ( ) . unwrap ( ) ) , 
235237                } ) 
@@ -685,91 +687,6 @@ mod tests {
685687        assert ! ( result. is_none( ) ) ;  // no pos dels for file 3 
686688    } 
687689
688-     /// Verifies that evolve_schema on partial-schema equality deletes fails with Arrow 
689- /// validation errors when missing REQUIRED columns are filled with NULLs. 
690- /// 
691- /// Reproduces the issue that caused 14 TestSparkReaderDeletes failures in Iceberg Java. 
692- #[ tokio:: test]  
693-     async  fn  test_partial_schema_equality_deletes_evolve_fails ( )  { 
694-         let  tmp_dir = TempDir :: new ( ) . unwrap ( ) ; 
695-         let  table_location = tmp_dir. path ( ) . as_os_str ( ) . to_str ( ) . unwrap ( ) ; 
696- 
697-         // Create table schema with REQUIRED fields 
698-         let  table_schema = Arc :: new ( 
699-             Schema :: builder ( ) 
700-                 . with_schema_id ( 1 ) 
701-                 . with_fields ( vec ! [ 
702-                     crate :: spec:: NestedField :: required( 
703-                         1 , 
704-                         "id" , 
705-                         crate :: spec:: Type :: Primitive ( crate :: spec:: PrimitiveType :: Int ) , 
706-                     ) 
707-                     . into( ) , 
708-                     crate :: spec:: NestedField :: required( 
709-                         2 , 
710-                         "data" , 
711-                         crate :: spec:: Type :: Primitive ( crate :: spec:: PrimitiveType :: String ) , 
712-                     ) 
713-                     . into( ) , 
714-                 ] ) 
715-                 . build ( ) 
716-                 . unwrap ( ) , 
717-         ) ; 
718- 
719-         // Write equality delete file with PARTIAL schema (only 'data' column) 
720-         let  delete_file_path = { 
721-             let  data_vals = vec ! [ "a" ,  "d" ,  "g" ] ; 
722-             let  data_col = Arc :: new ( StringArray :: from ( data_vals) )  as  ArrayRef ; 
723- 
724-             let  delete_schema = Arc :: new ( arrow_schema:: Schema :: new ( vec ! [ simple_field( 
725-                 "data" , 
726-                 DataType :: Utf8 , 
727-                 false , 
728-                 "2" ,  // field ID 
729-             ) ] ) ) ; 
730- 
731-             let  delete_batch = RecordBatch :: try_new ( delete_schema. clone ( ) ,  vec ! [ data_col] ) . unwrap ( ) ; 
732- 
733-             let  path = format ! ( "{}/partial-eq-deletes.parquet" ,  & table_location) ; 
734-             let  file = File :: create ( & path) . unwrap ( ) ; 
735-             let  props = WriterProperties :: builder ( ) 
736-                 . set_compression ( Compression :: SNAPPY ) 
737-                 . build ( ) ; 
738-             let  mut  writer =
739-                 ArrowWriter :: try_new ( file,  delete_batch. schema ( ) ,  Some ( props) ) . unwrap ( ) ; 
740-             writer. write ( & delete_batch) . expect ( "Writing batch" ) ; 
741-             writer. close ( ) . unwrap ( ) ; 
742-             path
743-         } ; 
744- 
745-         let  file_io = FileIO :: from_path ( table_location) . unwrap ( ) . build ( ) . unwrap ( ) ; 
746-         let  basic_delete_file_loader = BasicDeleteFileLoader :: new ( file_io. clone ( ) ) ; 
747- 
748-         let  batch_stream = basic_delete_file_loader
749-             . parquet_to_batch_stream ( & delete_file_path) 
750-             . await 
751-             . unwrap ( ) ; 
752- 
753-         let  mut  evolved_stream = BasicDeleteFileLoader :: evolve_schema ( batch_stream,  table_schema) 
754-             . await 
755-             . unwrap ( ) ; 
756- 
757-         let  result = evolved_stream. next ( ) . await . unwrap ( ) ; 
758- 
759-         assert ! ( 
760-             result. is_err( ) , 
761-             "Expected error from evolve_schema adding NULL to non-nullable column" 
762-         ) ; 
763- 
764-         let  err = result. unwrap_err ( ) ; 
765-         let  err_msg = err. to_string ( ) ; 
766-         assert ! ( 
767-             err_msg. contains( "non-nullable" )  || err_msg. contains( "null values" ) , 
768-             "Expected null value error, got: {}" , 
769-             err_msg
770-         ) ; 
771-     } 
772- 
773690    /// Test loading a FileScanTask with BOTH positional and equality deletes. 
774691/// Verifies the fix for the inverted condition that caused "Missing predicate for equality delete file" errors. 
775692#[ tokio:: test]  
0 commit comments