Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug with out of date last checkpoint, and clean listing function #354

Merged
merged 15 commits into from
Oct 2, 2024
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 104 additions & 13 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,29 +321,64 @@ fn read_last_checkpoint(

/// List all log files after a given checkpoint.
fn list_log_files_with_checkpoint(
cp: &CheckpointMetadata,
checkpoint_metadata: &CheckpointMetadata,
fs_client: &dyn FileSystemClient,
log_root: &Url,
) -> DeltaResult<(Vec<ParsedLogPath>, Vec<ParsedLogPath>)> {
let version_prefix = format!("{:020}", cp.version);
let version_prefix = format!("{:020}", checkpoint_metadata.version);
let start_from = log_root.join(&version_prefix)?;

let files: Vec<ParsedLogPath> = fs_client
.list_from(&start_from)?
.flat_map(|file| file.and_then(ParsedLogPath::try_from).transpose())
.try_collect()?;
let (mut commit_files, checkpoint_files): (Vec<_>, _) = files
.into_iter()
let mut max_checkpoint_version = checkpoint_metadata.version;
let mut checkpoint_files = vec![];
let mut commit_files = Vec::with_capacity(10);
nicklan marked this conversation as resolved.
Show resolved Hide resolved

for meta_res in fs_client.list_from(&start_from)? {
let meta = meta_res?;
let parsed_path = ParsedLogPath::try_from(meta)?;
// TODO this filters out .crc files etc which start with "." - how do we want to use these kind of files?
.filter(|p| p.is_commit() || p.is_checkpoint())
.partition(ParsedLogPath::is_commit);
if let Some(parsed_path) = parsed_path {
if parsed_path.is_commit() {
commit_files.push(parsed_path);
} else if parsed_path.is_checkpoint() {
match parsed_path.version.cmp(&max_checkpoint_version) {
Ordering::Greater => {
max_checkpoint_version = parsed_path.version;
checkpoint_files.clear();
checkpoint_files.push(parsed_path);
}
Ordering::Equal => checkpoint_files.push(parsed_path),
Ordering::Less => {}
}
}
}
}

if checkpoint_files.is_empty() {
// TODO: We could potentially recover here
nicklan marked this conversation as resolved.
Show resolved Hide resolved
return Err(Error::generic(
"Had a _last_checkpoint hint but didn't find any checkpoints",
));
}

if max_checkpoint_version != checkpoint_metadata.version {
warn!(
"_last_checkpoint hint is out of date. _last_checkpoint version: {}. Using actual most recent: {}",
checkpoint_metadata.version,
max_checkpoint_version
);
// we may need to drop some commits that are after the actual last checkpoint
nicklan marked this conversation as resolved.
Show resolved Hide resolved
commit_files.retain(|parsed_path| parsed_path.version > max_checkpoint_version);
} else if checkpoint_files.len() != checkpoint_metadata.parts.unwrap_or(1) as usize {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work?

Suggested change
} else if checkpoint_files.len() != checkpoint_metadata.parts.unwrap_or(1) as usize {
} else if checkpoint_files.len() != checkpoint_metadata.parts.unwrap_or(1usize) {

(replace usize with the suffix for whatever type parts has)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no. the issue is that checkpoint_metadata.parts was an i32, so the 1 was implicitly an i32, and trying to make it a usize doesn't work because the cast needs to happen after the unwrap.

Anyway, since the reason we have parts is to compare it against a vector len, I made it a usize in the struct, and we don't have to cast. It means serde will fail if there's a negative number in the json, but that's probably okay since that's a broken _last_checkpoint anyway.

return Err(Error::Generic(format!(
"_last_checkpoint indicated that checkpoint should have {} parts, but it has {}",
checkpoint_metadata.parts.unwrap_or(1),
checkpoint_files.len()
)));
}

// NOTE this will sort in reverse order
commit_files.sort_unstable_by(|a, b| b.version.cmp(&a.version));

// TODO raise a proper error
assert_eq!(checkpoint_files.len(), cp.parts.unwrap_or(1) as usize);

Ok((commit_files, checkpoint_files))
}

Expand Down Expand Up @@ -471,6 +506,62 @@ mod tests {
assert!(cp.is_none())
}

#[test]
fn test_read_log_with_out_of_date_last_checkpoint() {
let store = Arc::new(InMemory::new());

fn get_path(index: usize, suffix: &str) -> Path {
let path = format!("_delta_log/{index:020}.{suffix}");
Path::from(path.as_str())
}
let data = bytes::Bytes::from("kernel-data");

// add log files to store
tokio::runtime::Runtime::new()
.expect("create tokio runtime")
.block_on(async {
for path in [
get_path(0, "json"),
get_path(1, "checkpoint.parquet"),
get_path(2, "json"),
get_path(3, "checkpoint.parquet"),
get_path(4, "json"),
get_path(5, "checkpoint.parquet"),
get_path(6, "json"),
get_path(7, "json"),
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
] {
store
.put(&path, data.clone().into())
.await
.expect("put log file in store");
}
});

let client = ObjectStoreFileSystemClient::new(
store,
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);

let checkpoint_metadata = CheckpointMetadata {
version: 3,
size: 10,
parts: None,
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
};
let url = Url::parse("memory:///_delta_log/").expect("valid url");
let (commit_files, checkpoint_files) =
list_log_files_with_checkpoint(&checkpoint_metadata, &client, &url).unwrap();
assert_eq!(checkpoint_files.len(), 1);
assert_eq!(commit_files.len(), 2);
assert_eq!(checkpoint_files[0].version, 5);
assert_eq!(commit_files[0].version, 7);
assert_eq!(commit_files[1].version, 6);
}

fn valid_last_checkpoint() -> Vec<u8> {
r#"{"size":8,"size_in_bytes":21857,"version":1}"#.as_bytes().to_vec()
}
Expand Down
Loading