From c81da02491bee7bd77e1eb4e132fe748c822a61d Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Wed, 2 Oct 2024 15:29:58 -0700 Subject: [PATCH] Fix bug with out of date last checkpoint, and clean listing function (#354) If we have a `_last_checkpoint` that is out of date, things can get confused. This code: 1. Cleans up the listing function a bit 2. Ensures we end up with the real latest checkpoint 3. Drops any commit files from the listing that are older than the last checkpoint 4. `warns!` if ` _last_checkpoint` is out of date 5. Adds a test for this case This code will conflict with #347, so maybe hold of merging until that merges and then I can rebase and clean this up more. --------- Co-authored-by: Nick Lanham Co-authored-by: Zach Schuermann Co-authored-by: Ryan Johnson Co-authored-by: Stephen Carman --- kernel/src/snapshot.rs | 132 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 118 insertions(+), 14 deletions(-) diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 55acbd2c3..09dde8e76 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -283,7 +283,7 @@ struct CheckpointMetadata { /// The number of actions that are stored in the checkpoint. pub(crate) size: i64, /// The number of fragments if the last checkpoint was written in multiple parts. - pub(crate) parts: Option, + pub(crate) parts: Option, /// The number of bytes of the checkpoint. pub(crate) size_in_bytes: Option, /// The number of AddFile actions in the checkpoint. @@ -321,29 +321,67 @@ fn read_last_checkpoint( /// List all log files after a given checkpoint. fn list_log_files_with_checkpoint( - cp: &CheckpointMetadata, + checkpoint_metadata: &CheckpointMetadata, fs_client: &dyn FileSystemClient, log_root: &Url, ) -> DeltaResult<(Vec, Vec)> { - let version_prefix = format!("{:020}", cp.version); + let version_prefix = format!("{:020}", checkpoint_metadata.version); let start_from = log_root.join(&version_prefix)?; - let files: Vec = fs_client - .list_from(&start_from)? - .flat_map(|file| file.and_then(ParsedLogPath::try_from).transpose()) - .try_collect()?; - let (mut commit_files, checkpoint_files): (Vec<_>, _) = files - .into_iter() + let mut max_checkpoint_version = checkpoint_metadata.version; + let mut checkpoint_files = vec![]; + // We expect 10 commit files per checkpoint, so start with that size. We could adjust this based + // on config at some point + let mut commit_files = Vec::with_capacity(10); + + for meta_res in fs_client.list_from(&start_from)? { + let meta = meta_res?; + let parsed_path = ParsedLogPath::try_from(meta)?; // TODO this filters out .crc files etc which start with "." - how do we want to use these kind of files? - .filter(|p| p.is_commit() || p.is_checkpoint()) - .partition(ParsedLogPath::is_commit); + if let Some(parsed_path) = parsed_path { + if parsed_path.is_commit() { + commit_files.push(parsed_path); + } else if parsed_path.is_checkpoint() { + match parsed_path.version.cmp(&max_checkpoint_version) { + Ordering::Greater => { + max_checkpoint_version = parsed_path.version; + checkpoint_files.clear(); + checkpoint_files.push(parsed_path); + } + Ordering::Equal => checkpoint_files.push(parsed_path), + Ordering::Less => {} + } + } + } + } + + if checkpoint_files.is_empty() { + // TODO: We could potentially recover here + return Err(Error::generic( + "Had a _last_checkpoint hint but didn't find any checkpoints", + )); + } + + if max_checkpoint_version != checkpoint_metadata.version { + warn!( + "_last_checkpoint hint is out of date. _last_checkpoint version: {}. Using actual most recent: {}", + checkpoint_metadata.version, + max_checkpoint_version + ); + // we (may) need to drop commits that are before the _actual_ last checkpoint (that + // is, commits between a stale _last_checkpoint and the _actual_ last checkpoint) + commit_files.retain(|parsed_path| parsed_path.version > max_checkpoint_version); + } else if checkpoint_files.len() != checkpoint_metadata.parts.unwrap_or(1) { + return Err(Error::Generic(format!( + "_last_checkpoint indicated that checkpoint should have {} parts, but it has {}", + checkpoint_metadata.parts.unwrap_or(1), + checkpoint_files.len() + ))); + } // NOTE this will sort in reverse order commit_files.sort_unstable_by(|a, b| b.version.cmp(&a.version)); - // TODO raise a proper error - assert_eq!(checkpoint_files.len(), cp.parts.unwrap_or(1) as usize); - Ok((commit_files, checkpoint_files)) } @@ -471,6 +509,72 @@ mod tests { assert!(cp.is_none()) } + #[test] + fn test_read_log_with_out_of_date_last_checkpoint() { + let store = Arc::new(InMemory::new()); + + fn get_path(index: usize, suffix: &str) -> Path { + let path = format!("_delta_log/{index:020}.{suffix}"); + Path::from(path.as_str()) + } + let data = bytes::Bytes::from("kernel-data"); + + let checkpoint_metadata = CheckpointMetadata { + version: 3, + size: 10, + parts: None, + size_in_bytes: None, + num_of_add_files: None, + checkpoint_schema: None, + checksum: None, + }; + + // add log files to store + tokio::runtime::Runtime::new() + .expect("create tokio runtime") + .block_on(async { + for path in [ + get_path(0, "json"), + get_path(1, "checkpoint.parquet"), + get_path(2, "json"), + get_path(3, "checkpoint.parquet"), + get_path(4, "json"), + get_path(5, "checkpoint.parquet"), + get_path(6, "json"), + get_path(7, "json"), + ] { + store + .put(&path, data.clone().into()) + .await + .expect("put log file in store"); + } + let checkpoint_str = + serde_json::to_string(&checkpoint_metadata).expect("Serialize checkpoint"); + store + .put( + &Path::from("_delta_log/_last_checkpoint"), + checkpoint_str.into(), + ) + .await + .expect("Write _last_checkpoint"); + }); + + let client = ObjectStoreFileSystemClient::new( + store, + Path::from("/"), + Arc::new(TokioBackgroundExecutor::new()), + ); + + let url = Url::parse("memory:///_delta_log/").expect("valid url"); + let (commit_files, checkpoint_files) = + list_log_files_with_checkpoint(&checkpoint_metadata, &client, &url).unwrap(); + assert_eq!(checkpoint_files.len(), 1); + assert_eq!(commit_files.len(), 2); + assert_eq!(checkpoint_files[0].version, 5); + assert_eq!(commit_files[0].version, 7); + assert_eq!(commit_files[1].version, 6); + } + fn valid_last_checkpoint() -> Vec { r#"{"size":8,"size_in_bytes":21857,"version":1}"#.as_bytes().to_vec() }