Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug with out of date last checkpoint, and clean listing function #354

Merged
merged 15 commits into from
Oct 2, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 112 additions & 31 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,47 +321,63 @@ fn read_last_checkpoint(

/// List all log files after a given checkpoint.
fn list_log_files_with_checkpoint(
cp: &CheckpointMetadata,
checkpoint_metadata: &CheckpointMetadata,
fs_client: &dyn FileSystemClient,
log_root: &Url,
) -> DeltaResult<(Vec<FileMeta>, Vec<FileMeta>)> {
let version_prefix = format!("{:020}", cp.version);
let version_prefix = format!("{:020}", checkpoint_metadata.version);
let start_from = log_root.join(&version_prefix)?;

let files = fs_client
.list_from(&start_from)?
.collect::<Result<Vec<_>, Error>>()?
.into_iter()
let mut max_checkpoint_version = checkpoint_metadata.version;
let mut checkpoint_files = Vec::with_capacity(10);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, any reason why you expect the Vec to be at most size 10?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is odd, because most checkpoints will have just one file?
I wonder if the intent was to pre-size the commit_files to 10, since that's the default checkpoint interval for delta-spark?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that makes sense. I've swapped them so we reserve 10 for commits.

let mut commit_files = vec![];

for meta_res in fs_client.list_from(&start_from)? {
let meta = meta_res?;
let path = LogPath::new(&meta.location);
// TODO this filters out .crc files etc which start with "." - how do we want to use these kind of files?
.filter(|f| version_from_location(&f.location).is_some())
.collect::<Vec<_>>();

let mut commit_files = files
.iter()
.filter_map(|f| {
if LogPath::new(&f.location).is_commit {
Some(f.clone())
} else {
None
if path.version.is_some() {
if path.is_commit {
commit_files.push(meta);
} else if path.is_checkpoint {
let version = path.version.unwrap_or(0);
match version.cmp(&max_checkpoint_version) {
Ordering::Greater => {
max_checkpoint_version = version;
checkpoint_files.clear();
checkpoint_files.push(meta);
}
Ordering::Equal => checkpoint_files.push(meta),
Ordering::Less => {}
}
}
})
.collect_vec();
}
}

// NOTE this will sort in reverse order
commit_files.sort_unstable_by(|a, b| b.location.cmp(&a.location));
checkpoint_files.sort_unstable_by(|a, b| b.location.cmp(&a.location));
if checkpoint_files.is_empty() {
return Err(Error::generic(
"Had a _last_checkpoint hint but didn't find any checkpoints",
));
}

let checkpoint_files = files
.iter()
.filter_map(|f| {
if LogPath::new(&f.location).is_checkpoint {
Some(f.clone())
} else {
None
}
})
.collect_vec();
if max_checkpoint_version != checkpoint_metadata.version {
warn!("_last_checkpoint hint is out of date. _last_checkpoint version: {}. Using actual most recent: {}", checkpoint_metadata.version, max_checkpoint_version);
// we may need to drop some commits that are after the actual last checkpoint
nicklan marked this conversation as resolved.
Show resolved Hide resolved
commit_files.retain(|commit_meta| {
version_from_location(&commit_meta.location).unwrap_or(0) > max_checkpoint_version
});
} else if checkpoint_files.len() != checkpoint_metadata.parts.unwrap_or(1) as usize {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work?

Suggested change
} else if checkpoint_files.len() != checkpoint_metadata.parts.unwrap_or(1) as usize {
} else if checkpoint_files.len() != checkpoint_metadata.parts.unwrap_or(1usize) {

(replace usize with the suffix for whatever type parts has)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no. the issue is that checkpoint_metadata.parts was an i32, so the 1 was implicitly an i32, and trying to make it a usize doesn't work because the cast needs to happen after the unwrap.

Anyway, since the reason we have parts is to compare it against a vector len, I made it a usize in the struct, and we don't have to cast. It means serde will fail if there's a negative number in the json, but that's probably okay since that's a broken _last_checkpoint anyway.

return Err(Error::Generic(format!(
"_last_checkpoint indicated that checkpoint should have {} parts, but it has {}",
checkpoint_metadata.parts.unwrap_or(1),
checkpoint_files.len()
)));
}

// TODO raise a proper error
assert_eq!(checkpoint_files.len(), cp.parts.unwrap_or(1) as usize);
// NOTE this will sort in reverse order
commit_files.sort_unstable_by(|a, b| b.location.cmp(&a.location));

Ok((commit_files, checkpoint_files))
}
Expand Down Expand Up @@ -490,6 +506,71 @@ mod tests {
assert!(cp.is_none())
}

#[test]
fn test_read_log_with_out_of_date_last_checkpoint() {
let store = Arc::new(InMemory::new());

fn get_path(index: usize, suffix: &str) -> Path {
let path = format!("_delta_log/{index:020}.{suffix}");
Path::from(path.as_str())
}
let data = bytes::Bytes::from("kernel-data");

// add log files to store
tokio::runtime::Runtime::new()
.expect("create tokio runtime")
.block_on(async {
for path in [
get_path(0, "json"),
get_path(1, "checkpoint.parquet"),
get_path(2, "json"),
get_path(3, "checkpoint.parquet"),
get_path(4, "json"),
get_path(5, "checkpoint.parquet"),
get_path(6, "json"),
get_path(7, "json"),
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
] {
store
.put(&path, data.clone().into())
.await
.expect("put log file in store");
}
});

let client = ObjectStoreFileSystemClient::new(
store,
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);

let checkpoint_metadata = CheckpointMetadata {
version: 3,
size: 10,
parts: None,
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
};
let url = Url::parse("memory:///_delta_log/").expect("valid url");
let (commit_files, checkpoint_files) =
list_log_files_with_checkpoint(&checkpoint_metadata, &client, &url).unwrap();
assert_eq!(checkpoint_files.len(), 1);
assert_eq!(commit_files.len(), 2);
assert_eq!(
version_from_location(&checkpoint_files[0].location).unwrap_or(0),
5
);
assert_eq!(
version_from_location(&commit_files[0].location).unwrap_or(0),
7
);
assert_eq!(
version_from_location(&commit_files[1].location).unwrap_or(0),
6
);
}

fn valid_last_checkpoint() -> Vec<u8> {
r#"{"size":8,"size_in_bytes":21857,"version":1}"#.as_bytes().to_vec()
}
Expand Down
Loading