Skip to content

Commit

Permalink
Fix bug with out of date last checkpoint, and clean listing function (d…
Browse files Browse the repository at this point in the history
…elta-io#354)

If we have a `_last_checkpoint` that is out of date, things can get
confused. This code:

1. Cleans up the listing function a bit
2. Ensures we end up with the real latest checkpoint
3. Drops any commit files from the listing that are older than the last
checkpoint
4. `warns!` if ` _last_checkpoint` is out of date
5. Adds a test for this case

This code will conflict with delta-io#347, so maybe hold of merging until that
merges and then I can rebase and clean this up more.

---------

Co-authored-by: Nick Lanham <[email protected]>
Co-authored-by: Zach Schuermann <[email protected]>
Co-authored-by: Ryan Johnson <[email protected]>
Co-authored-by: Stephen Carman <[email protected]>
  • Loading branch information
5 people authored and OussamaSaoudi-db committed Oct 7, 2024
1 parent fbf70a6 commit dd578d3
Showing 1 changed file with 118 additions and 14 deletions.
132 changes: 118 additions & 14 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ struct CheckpointMetadata {
/// The number of actions that are stored in the checkpoint.
pub(crate) size: i64,
/// The number of fragments if the last checkpoint was written in multiple parts.
pub(crate) parts: Option<i32>,
pub(crate) parts: Option<usize>,
/// The number of bytes of the checkpoint.
pub(crate) size_in_bytes: Option<i64>,
/// The number of AddFile actions in the checkpoint.
Expand Down Expand Up @@ -321,29 +321,67 @@ fn read_last_checkpoint(

/// List all log files after a given checkpoint.
fn list_log_files_with_checkpoint(
cp: &CheckpointMetadata,
checkpoint_metadata: &CheckpointMetadata,
fs_client: &dyn FileSystemClient,
log_root: &Url,
) -> DeltaResult<(Vec<ParsedLogPath>, Vec<ParsedLogPath>)> {
let version_prefix = format!("{:020}", cp.version);
let version_prefix = format!("{:020}", checkpoint_metadata.version);
let start_from = log_root.join(&version_prefix)?;

let files: Vec<ParsedLogPath> = fs_client
.list_from(&start_from)?
.flat_map(|file| file.and_then(ParsedLogPath::try_from).transpose())
.try_collect()?;
let (mut commit_files, checkpoint_files): (Vec<_>, _) = files
.into_iter()
let mut max_checkpoint_version = checkpoint_metadata.version;
let mut checkpoint_files = vec![];
// We expect 10 commit files per checkpoint, so start with that size. We could adjust this based
// on config at some point
let mut commit_files = Vec::with_capacity(10);

for meta_res in fs_client.list_from(&start_from)? {
let meta = meta_res?;
let parsed_path = ParsedLogPath::try_from(meta)?;
// TODO this filters out .crc files etc which start with "." - how do we want to use these kind of files?
.filter(|p| p.is_commit() || p.is_checkpoint())
.partition(ParsedLogPath::is_commit);
if let Some(parsed_path) = parsed_path {
if parsed_path.is_commit() {
commit_files.push(parsed_path);
} else if parsed_path.is_checkpoint() {
match parsed_path.version.cmp(&max_checkpoint_version) {
Ordering::Greater => {
max_checkpoint_version = parsed_path.version;
checkpoint_files.clear();
checkpoint_files.push(parsed_path);
}
Ordering::Equal => checkpoint_files.push(parsed_path),
Ordering::Less => {}
}
}
}
}

if checkpoint_files.is_empty() {
// TODO: We could potentially recover here
return Err(Error::generic(
"Had a _last_checkpoint hint but didn't find any checkpoints",
));
}

if max_checkpoint_version != checkpoint_metadata.version {
warn!(
"_last_checkpoint hint is out of date. _last_checkpoint version: {}. Using actual most recent: {}",
checkpoint_metadata.version,
max_checkpoint_version
);
// we (may) need to drop commits that are before the _actual_ last checkpoint (that
// is, commits between a stale _last_checkpoint and the _actual_ last checkpoint)
commit_files.retain(|parsed_path| parsed_path.version > max_checkpoint_version);
} else if checkpoint_files.len() != checkpoint_metadata.parts.unwrap_or(1) {
return Err(Error::Generic(format!(
"_last_checkpoint indicated that checkpoint should have {} parts, but it has {}",
checkpoint_metadata.parts.unwrap_or(1),
checkpoint_files.len()
)));
}

// NOTE this will sort in reverse order
commit_files.sort_unstable_by(|a, b| b.version.cmp(&a.version));

// TODO raise a proper error
assert_eq!(checkpoint_files.len(), cp.parts.unwrap_or(1) as usize);

Ok((commit_files, checkpoint_files))
}

Expand Down Expand Up @@ -471,6 +509,72 @@ mod tests {
assert!(cp.is_none())
}

#[test]
fn test_read_log_with_out_of_date_last_checkpoint() {
let store = Arc::new(InMemory::new());

fn get_path(index: usize, suffix: &str) -> Path {
let path = format!("_delta_log/{index:020}.{suffix}");
Path::from(path.as_str())
}
let data = bytes::Bytes::from("kernel-data");

let checkpoint_metadata = CheckpointMetadata {
version: 3,
size: 10,
parts: None,
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
};

// add log files to store
tokio::runtime::Runtime::new()
.expect("create tokio runtime")
.block_on(async {
for path in [
get_path(0, "json"),
get_path(1, "checkpoint.parquet"),
get_path(2, "json"),
get_path(3, "checkpoint.parquet"),
get_path(4, "json"),
get_path(5, "checkpoint.parquet"),
get_path(6, "json"),
get_path(7, "json"),
] {
store
.put(&path, data.clone().into())
.await
.expect("put log file in store");
}
let checkpoint_str =
serde_json::to_string(&checkpoint_metadata).expect("Serialize checkpoint");
store
.put(
&Path::from("_delta_log/_last_checkpoint"),
checkpoint_str.into(),
)
.await
.expect("Write _last_checkpoint");
});

let client = ObjectStoreFileSystemClient::new(
store,
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);

let url = Url::parse("memory:///_delta_log/").expect("valid url");
let (commit_files, checkpoint_files) =
list_log_files_with_checkpoint(&checkpoint_metadata, &client, &url).unwrap();
assert_eq!(checkpoint_files.len(), 1);
assert_eq!(commit_files.len(), 2);
assert_eq!(checkpoint_files[0].version, 5);
assert_eq!(commit_files[0].version, 7);
assert_eq!(commit_files[1].version, 6);
}

fn valid_last_checkpoint() -> Vec<u8> {
r#"{"size":8,"size_in_bytes":21857,"version":1}"#.as_bytes().to_vec()
}
Expand Down

0 comments on commit dd578d3

Please sign in to comment.