@@ -364,6 +364,10 @@ fn list_log_files_with_checkpoint(
364
364
365
365
if max_checkpoint_version != checkpoint_metadata. version {
366
366
warn ! ( "_last_checkpoint hint is out of date. _last_checkpoint version: {}. Using actual most recent: {}" , checkpoint_metadata. version, max_checkpoint_version) ;
367
+ // we may need to drop some commits that are after the actual last checkpoint
368
+ commit_files. retain ( |commit_meta| {
369
+ version_from_location ( & commit_meta. location ) . unwrap_or ( 0 ) > max_checkpoint_version
370
+ } ) ;
367
371
} else if checkpoint_files. len ( ) != checkpoint_metadata. parts . unwrap_or ( 1 ) as usize {
368
372
return Err ( Error :: Generic ( format ! (
369
373
"_last_checkpoint indicated that checkpoint should have {} parts, but it has {}" ,
@@ -502,6 +506,72 @@ mod tests {
502
506
assert ! ( cp. is_none( ) )
503
507
}
504
508
509
+ #[ test]
510
+ fn test_read_log_with_out_of_date_last_checkpoint ( ) {
511
+ // in memory file system
512
+ let store = Arc :: new ( InMemory :: new ( ) ) ;
513
+
514
+ fn get_path ( index : usize , suffix : & str ) -> Path {
515
+ let path = format ! ( "_delta_log/{index:020}.{suffix}" ) ;
516
+ Path :: from ( path. as_str ( ) )
517
+ }
518
+ let data = bytes:: Bytes :: from ( "kernel-data" ) ;
519
+
520
+ // put commit files
521
+ tokio:: runtime:: Runtime :: new ( )
522
+ . expect ( "create tokio runtime" )
523
+ . block_on ( async {
524
+ for path in [
525
+ get_path ( 0 , "json" ) ,
526
+ get_path ( 1 , "checkpoint.parquet" ) ,
527
+ get_path ( 2 , "json" ) ,
528
+ get_path ( 3 , "checkpoint.parquet" ) ,
529
+ get_path ( 4 , "json" ) ,
530
+ get_path ( 5 , "checkpoint.parquet" ) ,
531
+ get_path ( 6 , "json" ) ,
532
+ get_path ( 7 , "json" ) ,
533
+ ] {
534
+ store
535
+ . put ( & path, data. clone ( ) . into ( ) )
536
+ . await
537
+ . expect ( "put _last_checkpoint" ) ;
538
+ }
539
+ } ) ;
540
+
541
+ let client = ObjectStoreFileSystemClient :: new (
542
+ store,
543
+ Path :: from ( "/" ) ,
544
+ Arc :: new ( TokioBackgroundExecutor :: new ( ) ) ,
545
+ ) ;
546
+
547
+ let checkpoint_metadata = CheckpointMetadata {
548
+ version : 3 ,
549
+ size : 10 ,
550
+ parts : None ,
551
+ size_in_bytes : None ,
552
+ num_of_add_files : None ,
553
+ checkpoint_schema : None ,
554
+ checksum : None ,
555
+ } ;
556
+ let url = Url :: parse ( "memory:///_delta_log/" ) . expect ( "valid url" ) ;
557
+ let ( commit_files, checkpoint_files) =
558
+ list_log_files_with_checkpoint ( & checkpoint_metadata, & client, & url) . unwrap ( ) ;
559
+ assert_eq ! ( checkpoint_files. len( ) , 1 ) ;
560
+ assert_eq ! ( commit_files. len( ) , 2 ) ;
561
+ assert_eq ! (
562
+ version_from_location( & checkpoint_files[ 0 ] . location) . unwrap_or( 0 ) ,
563
+ 5
564
+ ) ;
565
+ assert_eq ! (
566
+ version_from_location( & commit_files[ 0 ] . location) . unwrap_or( 0 ) ,
567
+ 7
568
+ ) ;
569
+ assert_eq ! (
570
+ version_from_location( & commit_files[ 1 ] . location) . unwrap_or( 0 ) ,
571
+ 6
572
+ ) ;
573
+ }
574
+
505
575
fn valid_last_checkpoint ( ) -> Vec < u8 > {
506
576
r#"{"size":8,"size_in_bytes":21857,"version":1}"# . as_bytes ( ) . to_vec ( )
507
577
}
0 commit comments