Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug with out of date last checkpoint, and clean listing function #354

Merged
merged 15 commits into from
Oct 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 118 additions & 14 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ struct CheckpointMetadata {
/// The number of actions that are stored in the checkpoint.
pub(crate) size: i64,
/// The number of fragments if the last checkpoint was written in multiple parts.
pub(crate) parts: Option<i32>,
pub(crate) parts: Option<usize>,
/// The number of bytes of the checkpoint.
pub(crate) size_in_bytes: Option<i64>,
/// The number of AddFile actions in the checkpoint.
Expand Down Expand Up @@ -321,29 +321,67 @@ fn read_last_checkpoint(

/// List all log files after a given checkpoint.
fn list_log_files_with_checkpoint(
cp: &CheckpointMetadata,
checkpoint_metadata: &CheckpointMetadata,
fs_client: &dyn FileSystemClient,
log_root: &Url,
) -> DeltaResult<(Vec<ParsedLogPath>, Vec<ParsedLogPath>)> {
let version_prefix = format!("{:020}", cp.version);
let version_prefix = format!("{:020}", checkpoint_metadata.version);
let start_from = log_root.join(&version_prefix)?;

let files: Vec<ParsedLogPath> = fs_client
.list_from(&start_from)?
.flat_map(|file| file.and_then(ParsedLogPath::try_from).transpose())
.try_collect()?;
let (mut commit_files, checkpoint_files): (Vec<_>, _) = files
.into_iter()
let mut max_checkpoint_version = checkpoint_metadata.version;
let mut checkpoint_files = vec![];
// We expect 10 commit files per checkpoint, so start with that size. We could adjust this based
// on config at some point
let mut commit_files = Vec::with_capacity(10);
nicklan marked this conversation as resolved.
Show resolved Hide resolved

for meta_res in fs_client.list_from(&start_from)? {
let meta = meta_res?;
let parsed_path = ParsedLogPath::try_from(meta)?;
// TODO this filters out .crc files etc which start with "." - how do we want to use these kind of files?
.filter(|p| p.is_commit() || p.is_checkpoint())
.partition(ParsedLogPath::is_commit);
if let Some(parsed_path) = parsed_path {
if parsed_path.is_commit() {
commit_files.push(parsed_path);
} else if parsed_path.is_checkpoint() {
match parsed_path.version.cmp(&max_checkpoint_version) {
Ordering::Greater => {
max_checkpoint_version = parsed_path.version;
checkpoint_files.clear();
checkpoint_files.push(parsed_path);
}
Ordering::Equal => checkpoint_files.push(parsed_path),
Ordering::Less => {}
}
}
}
}

if checkpoint_files.is_empty() {
// TODO: We could potentially recover here
nicklan marked this conversation as resolved.
Show resolved Hide resolved
return Err(Error::generic(
"Had a _last_checkpoint hint but didn't find any checkpoints",
));
}

if max_checkpoint_version != checkpoint_metadata.version {
warn!(
"_last_checkpoint hint is out of date. _last_checkpoint version: {}. Using actual most recent: {}",
checkpoint_metadata.version,
max_checkpoint_version
);
// we (may) need to drop commits that are before the _actual_ last checkpoint (that
// is, commits between a stale _last_checkpoint and the _actual_ last checkpoint)
commit_files.retain(|parsed_path| parsed_path.version > max_checkpoint_version);
} else if checkpoint_files.len() != checkpoint_metadata.parts.unwrap_or(1) {
return Err(Error::Generic(format!(
"_last_checkpoint indicated that checkpoint should have {} parts, but it has {}",
checkpoint_metadata.parts.unwrap_or(1),
checkpoint_files.len()
)));
}

// NOTE this will sort in reverse order
commit_files.sort_unstable_by(|a, b| b.version.cmp(&a.version));

// TODO raise a proper error
assert_eq!(checkpoint_files.len(), cp.parts.unwrap_or(1) as usize);

Ok((commit_files, checkpoint_files))
}

Expand Down Expand Up @@ -471,6 +509,72 @@ mod tests {
assert!(cp.is_none())
}

#[test]
fn test_read_log_with_out_of_date_last_checkpoint() {
let store = Arc::new(InMemory::new());

fn get_path(index: usize, suffix: &str) -> Path {
let path = format!("_delta_log/{index:020}.{suffix}");
Path::from(path.as_str())
}
let data = bytes::Bytes::from("kernel-data");

let checkpoint_metadata = CheckpointMetadata {
version: 3,
size: 10,
parts: None,
size_in_bytes: None,
num_of_add_files: None,
checkpoint_schema: None,
checksum: None,
};

// add log files to store
tokio::runtime::Runtime::new()
.expect("create tokio runtime")
.block_on(async {
for path in [
get_path(0, "json"),
get_path(1, "checkpoint.parquet"),
get_path(2, "json"),
get_path(3, "checkpoint.parquet"),
get_path(4, "json"),
get_path(5, "checkpoint.parquet"),
get_path(6, "json"),
get_path(7, "json"),
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
] {
store
.put(&path, data.clone().into())
.await
.expect("put log file in store");
}
let checkpoint_str =
serde_json::to_string(&checkpoint_metadata).expect("Serialize checkpoint");
store
.put(
&Path::from("_delta_log/_last_checkpoint"),
checkpoint_str.into(),
)
.await
.expect("Write _last_checkpoint");
});

let client = ObjectStoreFileSystemClient::new(
store,
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);

let url = Url::parse("memory:///_delta_log/").expect("valid url");
let (commit_files, checkpoint_files) =
list_log_files_with_checkpoint(&checkpoint_metadata, &client, &url).unwrap();
assert_eq!(checkpoint_files.len(), 1);
assert_eq!(commit_files.len(), 2);
assert_eq!(checkpoint_files[0].version, 5);
assert_eq!(commit_files[0].version, 7);
assert_eq!(commit_files[1].version, 6);
}

fn valid_last_checkpoint() -> Vec<u8> {
r#"{"size":8,"size_in_bytes":21857,"version":1}"#.as_bytes().to_vec()
}
Expand Down
Loading