@@ -9,14 +9,9 @@ use crate::arrow::{
99 array:: { create_array, RecordBatch } ,
1010 datatypes:: Field ,
1111} ;
12- <<<<<<< HEAD
13- use crate :: checkpoint:: create_last_checkpoint_data ;
14- use crate :: engine:: arrow_data:: ArrowEngineData ;
15- use crate :: engine:: default:: executor:: tokio:: TokioMultiThreadExecutor ;
16- =======
1712use crate :: checkpoint:: { create_last_checkpoint_data, CheckpointDataIterator } ;
1813use crate :: engine:: arrow_data:: { ArrowEngineData , EngineDataArrowExt } ;
19- >>>>>>> 9 df95006 ( write-integration )
14+ use crate :: engine :: default :: executor :: tokio :: TokioMultiThreadExecutor ;
2015use crate :: engine:: default:: DefaultEngineBuilder ;
2116use crate :: log_replay:: HasSelectionVector ;
2217use crate :: schema:: { DataType as KernelDataType , StructField , StructType } ;
@@ -629,22 +624,6 @@ async fn test_no_checkpoint_staged_commits() -> DeltaResult<()> {
629624 Ok ( ( ) )
630625}
631626
632- <<<<<<< HEAD
633- #[ tokio:: test( flavor = "multi_thread" , worker_threads = 2 ) ]
634- async fn test_snapshot_checkpoint( ) -> DeltaResult < ( ) > {
635- let ( store, _) = new_in_memory_store( ) ;
636- let executor = Arc :: new( TokioMultiThreadExecutor :: new(
637- tokio:: runtime:: Handle :: current( ) ,
638- ) ) ;
639- let engine = DefaultEngineBuilder :: new( store. clone( ) )
640- . with_task_executor( executor)
641- . build( ) ;
642-
643- // Version 0: metadata & protocol
644- write_commit_to_store(
645- & store,
646- vec ! [ create_metadata_action( ) , create_basic_protocol_action( ) ] ,
647- =======
648627/// Create a Metadata action with writeStatsAsStruct enabled
649628fn create_metadata_action_with_stats_struct ( ) -> Action {
650629 let mut config = HashMap :: new ( ) ;
@@ -682,47 +661,36 @@ fn create_add_action_with_stats(path: &str, num_records: i64) -> Action {
682661 } )
683662}
684663
685- /// Tests checkpoint_data with writeStatsAsStruct enabled.
686- /// Verifies that the output schema includes stats_parsed.
687- #[ tokio:: test]
688- async fn test_checkpoint_data_struct_enabled( ) -> DeltaResult <( ) > {
664+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 2 ) ]
665+ async fn test_snapshot_checkpoint ( ) -> DeltaResult < ( ) > {
689666 let ( store, _) = new_in_memory_store ( ) ;
690- let engine = DefaultEngineBuilder :: new( store. clone( ) ) . build( ) ;
667+ let executor = Arc :: new ( TokioMultiThreadExecutor :: new (
668+ tokio:: runtime:: Handle :: current ( ) ,
669+ ) ) ;
670+ let engine = DefaultEngineBuilder :: new ( store. clone ( ) )
671+ . with_task_executor ( executor)
672+ . build ( ) ;
691673
692- // 1st commit: protocol + metadata with writeStatsAsStruct=true
674+ // Version 0: metadata & protocol
693675 write_commit_to_store (
694676 & store,
695- vec ! [
696- create_basic_protocol_action( ) ,
697- create_metadata_action_with_stats_struct( ) ,
698- ] ,
699- >>>>>>> 9 df95006 ( write-integration)
677+ vec ! [ create_metadata_action( ) , create_basic_protocol_action( ) ] ,
700678 0 ,
701679 )
702680 . await ?;
703681
704- <<<<<<< HEAD
705682 // Version 1: add 3 files
706683 write_commit_to_store (
707684 & store,
708685 vec ! [
709686 create_add_action( "file1.parquet" ) ,
710687 create_add_action( "file2.parquet" ) ,
711688 create_add_action( "file3.parquet" ) ,
712- =======
713- // 2nd commit: add actions with JSON stats
714- write_commit_to_store(
715- & store,
716- vec![
717- create_add_action_with_stats( "file1.parquet" , 100 ) ,
718- create_add_action_with_stats( "file2.parquet" , 200 ) ,
719- >>>>>>> 9 df95006 ( write-integration)
720689 ] ,
721690 1 ,
722691 )
723692 . await ?;
724693
725- <<<<<<< HEAD
726694 // Version 2: add 2 more files, remove 1
727695 write_commit_to_store (
728696 & store,
@@ -755,7 +723,75 @@ async fn test_checkpoint_data_struct_enabled() -> DeltaResult<()> {
755723 create_add_action( "file8.parquet" ) ,
756724 ] ,
757725 4 ,
758- =======
726+ )
727+ . await ?;
728+
729+ let table_root = Url :: parse ( "memory:///" ) ?;
730+ let snapshot = Snapshot :: builder_for ( table_root. clone ( ) ) . build ( & engine) ?;
731+
732+ snapshot. checkpoint ( & engine) ?;
733+
734+ // First checkpoint: 1 metadata + 1 protocol + 5 add + 3 remove = 10, numOfAddFiles = 5
735+ let checkpoint_path = Path :: from ( "_delta_log/00000000000000000004.checkpoint.parquet" ) ;
736+ let checkpoint_size = store. head ( & checkpoint_path) . await ?. size ;
737+ assert_last_checkpoint_contents ( & store, 4 , 10 , 5 , checkpoint_size) . await ?;
738+
739+ // Version 5: add 2 files, remove 1
740+ write_commit_to_store (
741+ & store,
742+ vec ! [
743+ create_add_action( "file9.parquet" ) ,
744+ create_add_action( "file10.parquet" ) ,
745+ create_remove_action( "file4.parquet" ) ,
746+ ] ,
747+ 5 ,
748+ )
749+ . await ?;
750+
751+ // Version 6: add 1 file
752+ write_commit_to_store ( & store, vec ! [ create_add_action( "file11.parquet" ) ] , 6 ) . await ?;
753+
754+ let snapshot = Snapshot :: builder_for ( table_root) . build ( & engine) ?;
755+
756+ snapshot. checkpoint ( & engine) ?;
757+
758+ // Second checkpoint: 1 metadata + 1 protocol + 7 add + 4 remove = 13, numOfAddFiles = 7
759+ let checkpoint_path = Path :: from ( "_delta_log/00000000000000000006.checkpoint.parquet" ) ;
760+ let checkpoint_size = store. head ( & checkpoint_path) . await ?. size ;
761+ assert_last_checkpoint_contents ( & store, 6 , 13 , 7 , checkpoint_size) . await ?;
762+
763+ Ok ( ( ) )
764+ }
765+
766+ /// Tests checkpoint_data with writeStatsAsStruct enabled.
767+ /// Verifies that the output schema includes stats_parsed.
768+ #[ tokio:: test]
769+ async fn test_checkpoint_data_struct_enabled ( ) -> DeltaResult < ( ) > {
770+ let ( store, _) = new_in_memory_store ( ) ;
771+ let engine = DefaultEngineBuilder :: new ( store. clone ( ) ) . build ( ) ;
772+
773+ // 1st commit: protocol + metadata with writeStatsAsStruct=true
774+ write_commit_to_store (
775+ & store,
776+ vec ! [
777+ create_basic_protocol_action( ) ,
778+ create_metadata_action_with_stats_struct( ) ,
779+ ] ,
780+ 0 ,
781+ )
782+ . await ?;
783+
784+ // 2nd commit: add actions with JSON stats
785+ write_commit_to_store (
786+ & store,
787+ vec ! [
788+ create_add_action_with_stats( "file1.parquet" , 100 ) ,
789+ create_add_action_with_stats( "file2.parquet" , 200 ) ,
790+ ] ,
791+ 1 ,
792+ )
793+ . await ?;
794+
759795 let table_root = Url :: parse ( "memory:///" ) ?;
760796 let snapshot = Snapshot :: builder_for ( table_root) . build ( & engine) ?;
761797 let writer = snapshot. create_checkpoint_writer ( ) ?;
@@ -812,45 +848,10 @@ async fn test_checkpoint_data_default_settings() -> DeltaResult<()> {
812848 & store,
813849 vec ! [ create_add_action_with_stats( "file1.parquet" , 100 ) ] ,
814850 1 ,
815- >>>>>>> 9 df95006 ( write-integration)
816851 )
817852 . await ?;
818853
819854 let table_root = Url :: parse ( "memory:///" ) ?;
820- <<<<<<< HEAD
821- let snapshot = Snapshot :: builder_for( table_root. clone( ) ) . build( & engine) ?;
822-
823- snapshot. checkpoint( & engine) ?;
824-
825- // First checkpoint: 1 metadata + 1 protocol + 5 add + 3 remove = 10, numOfAddFiles = 5
826- let checkpoint_path = Path :: from( "_delta_log/00000000000000000004.checkpoint.parquet" ) ;
827- let checkpoint_size = store. head( & checkpoint_path) . await ?. size;
828- assert_last_checkpoint_contents( & store, 4 , 10 , 5 , checkpoint_size) . await ?;
829-
830- // Version 5: add 2 files, remove 1
831- write_commit_to_store(
832- & store,
833- vec![
834- create_add_action( "file9.parquet" ) ,
835- create_add_action( "file10.parquet" ) ,
836- create_remove_action( "file4.parquet" ) ,
837- ] ,
838- 5 ,
839- )
840- . await ?;
841-
842- // Version 6: add 1 file
843- write_commit_to_store( & store, vec![ create_add_action( "file11.parquet" ) ] , 6 ) . await ?;
844-
845- let snapshot = Snapshot :: builder_for( table_root) . build( & engine) ?;
846-
847- snapshot. checkpoint( & engine) ?;
848-
849- // Second checkpoint: 1 metadata + 1 protocol + 7 add + 4 remove = 13, numOfAddFiles = 7
850- let checkpoint_path = Path :: from( "_delta_log/00000000000000000006.checkpoint.parquet" ) ;
851- let checkpoint_size = store. head( & checkpoint_path) . await ?. size;
852- assert_last_checkpoint_contents( & store, 6 , 13 , 7 , checkpoint_size) . await ?;
853- =======
854855 let snapshot = Snapshot :: builder_for ( table_root) . build ( & engine) ?;
855856 let writer = snapshot. create_checkpoint_writer ( ) ?;
856857
@@ -1096,7 +1097,6 @@ async fn test_all_stats_config_combinations() -> DeltaResult<()> {
10961097 struct2
10971098 ) ;
10981099 }
1099- >>>>>>> 9 df95006 ( write-integration)
11001100
11011101 Ok ( ( ) )
11021102}
0 commit comments