Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: incomplete multi-part checkpoint handling when no hint is provided #641

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

sebastiantia
Copy link
Collaborator

@sebastiantia sebastiantia commented Jan 13, 2025

What changes are proposed in this pull request?

Problem behavior:
When constructing a LogSegment, we scan the _delta_log directory for checkpoints and commits. The most recent checkpoint is collected from the log, but the kernel does not check that it has collected a full multi-part checkpoint. Thus, the checkpoint returned may be incomplete. More context can be found here #497

Expected behavior:
Keep track of the most recent complete checkpoint when iterating through the _delta_log directory and warn the user when incomplete checkpoints are encountered.
build_snapshot_with_out_of_date_last_checkpoint showcases the expected behavior, which now passes ✅ .

  • New test case build_snapshot_with_unsupported_uuid_checkpoint verifying that currently unsupported UuidCheckpoints are not considered valid checkpoints.

How was this change tested?

  • build_snapshot_with_out_of_date_last_checkpoint is the intended behavior of scenario described in Log file listing should handle incomplete multi-part checkpoints #497
  • build_snapshot_with_out_of_date_last_checkpoint_and_incomplete_recent_checkpoint. Prior to this PR, when an out of date checkpoint hint is provided, AND there is a more recent incomplete checkpoint, the more recent incomplete checkpoint would be taken and assumed to be complete - resulting in errors building the snapshot. Now, the more recent incomplete checkpoint is ignored.
  • build_snapshot_with_unsupported_uuid_checkpoint verifies that currently unsupported UuidCheckpoints are not considered valid checkpoints.
  • build_snapshot_with_multiple_incomplete_multipart_checkpoints verifies that multiple incomplete multi-part checkpoints that interleave are validated appropriately, more context here: fix: incomplete multi-part checkpoint handling when no hint is provided #641 (comment)

resolves #497

Copy link

codecov bot commented Jan 13, 2025

Codecov Report

Attention: Patch coverage is 85.29412% with 25 lines in your changes missing coverage. Please review.

Project coverage is 83.94%. Comparing base (e6aefda) to head (206250d).
Report is 5 commits behind head on main.

Files with missing lines Patch % Lines
kernel/src/log_segment.rs 62.68% 22 Missing and 3 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #641      +/-   ##
==========================================
+ Coverage   83.48%   83.94%   +0.46%     
==========================================
  Files          75       75              
  Lines       16919    17100     +181     
  Branches    16919    17100     +181     
==========================================
+ Hits        14124    14355     +231     
+ Misses       2141     2080      -61     
- Partials      654      665      +11     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

.filter_map(|res| match res {
Ok(path) => Some(path),
Err(e) => {
warn!("Error processing path: {:?}", e);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skipping ParsedLogPath::try_from() errors here as we were already filtering them out

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think it's dangerous to skip the errors here. It's best to try to return the error somehow. Does it work if you do something like chunk_by(|path| path.map(|x| x.version)).

The proposed approach tries chunking by DeltaResult<Version> instead of chunking by Version. The hope is to return the Err if we encounter it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chunk_by function requires that the keys it uses for grouping implement the PartialEq trait so it can compare them, but the Error in the DeltaResult does not implement the trait so it doesn't work...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use try_collect to handle errors before grouping.

let log_files = list_log_files(fs_client, log_root, start_version, end_version)?;

let log_files: Vec<ParsedLogPath> = log_files.try_collect()?;

for (version, files) in &log_files.into_iter().chunk_by(|path| path.version)

Copy link
Collaborator

@OussamaSaoudi OussamaSaoudi Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was rly tricky and I got nerdsniped thinking about the iterator stuff lol. But look into process_result.

I think I might've adapted your code right, but double check:

    let mut checkpoint_parts = vec![];
    let mut max_checkpoint_version = start_version;
    let mut commit_files = Vec::with_capacity(10);
    process_results(log_files, |iter| {
        let log_files = iter.chunk_by(move |x| x.version);

        for (version, files) in &log_files {
            let mut new_checkpoint_parts = vec![];

            for file in files {
                if file.is_commit() {
                    commit_files.push(file);
                } else if file.is_checkpoint() {
                    new_checkpoint_parts.push(file);
                }
            }
            if validate_checkpoint_parts(version, &new_checkpoint_parts) {
                max_checkpoint_version = Some(version);
                checkpoint_parts = new_checkpoint_parts;
            }
        }
    })?;
    Ok((commit_files, checkpoint_parts))

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: With ordered listing, checkpoint files always come before commit files. So in theory, we know whether we have a complete checkpoint by the time we encounter the commit file of a given version. Not sure if that allows simpler code or not tho.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use try collect and propagate the errors.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely agreed that we want to propagate errors. I wanted to avoid try_collect on all the log files because there could be a lot of them. process_results + chunky_by should only have 1 commit at a time in memory.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice find withprocess_results, it works nicely!

Copy link
Collaborator Author

@sebastiantia sebastiantia Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clearing the commit files when encountering complete checkpoints makes sense, and thank you for the helpful context @scovich

.filter_map(|res| match res {
Ok(path) => Some(path),
Err(e) => {
warn!("Error processing path: {:?}", e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think it's dangerous to skip the errors here. It's best to try to return the error somehow. Does it work if you do something like chunk_by(|path| path.map(|x| x.version)).

The proposed approach tries chunking by DeltaResult<Version> instead of chunking by Version. The hope is to return the Err if we encounter it.

Comment on lines 336 to 337
if validate_checkpoint_parts(version, &new_checkpoint_parts)
&& (max_checkpoint_version.is_none() || Some(version) >= max_checkpoint_version)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can just validate. iirc the listing should be in order. So the latest checkpoint should always be greater.

Copy link
Collaborator Author

@sebastiantia sebastiantia Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm right, looks like we were assuming that to begin with as we returned commit files in the order given from list_log_files, and soon after require that they are ascending in version # here...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets rid of max_checkpoint_version entirely :)


match checkpoint_parts.last().map(|file| &file.file_type) {
Some(LogPathFileType::MultiPartCheckpoint { num_parts, .. }) => {
if *num_parts as usize != checkpoint_parts.len() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be worth checking that:

  1. all the checkpoint parts are indeed of type MultiPartCheckpoint
  2. that the set of multi-part checkpoints has parts 0..n.

@zachschuermann what do you think?

Copy link
Collaborator

@scovich scovich Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's actually 1..=n. And yes, we should check. Also have to be careful because technically there could be two incomplete checkpoints with different num_parts for the same version. Also, we MUST accept at most one checkpoint -- even if multiple complete checkpoints are available -- so this function needs to filter, not just check.

Unfortunately, the poorly-chosen naming convention for multi-part checkpoint files means they interleave:

00000000000000000010.checkpoint.0000000001.0000000003.parquet
00000000000000000010.checkpoint.0000000001.0000000004.parquet
00000000000000000010.checkpoint.0000000002.0000000003.parquet
00000000000000000010.checkpoint.0000000002.0000000004.parquet
00000000000000000010.checkpoint.0000000003.0000000003.parquet
00000000000000000010.checkpoint.0000000003.0000000004.parquet
00000000000000000010.checkpoint.0000000004.0000000004.parquet

... which makes it a lot harder to identify the files of a given checkpoint and also means we can't just return a subslice in case there were multiple checkpoints to choose from.

We'd probably need to build a hash map keyed by number of parts:

let mut checkpoints = HashMap::new();
for part_file in checkpoint_parts {
    use LogPathFileType::*;
    match &file.file_type {
        SinglePartCheckpoint 
            | UuidCheckpoint(_) 
            | MultiPartCheckpoint { part_num: 1, num_parts: 1 } => 
        {
           // All single-file checkpoints are equivalent, just keep one
           checkpoints.insert(1, vec![part_file]);
        }
        MultiPartCheckpoint { part_num: 1, num_parts } => {
            // Start a new multi-part checkpoint with at least 2 parts
            checkpoints.insert(num_parts, vec![part_file]);
        }
        MultiPartCheckpoint { part_num, num_parts } => {
            // Continue a new multi-part checkpoint with at least 2 parts
            if let Some(part_files) = checkpoints.get_mut(num_parts) {
                if part_num == 1 + part_files.size() {
                    // Safe to append because all previous parts exist
                    part_files.append(part_file); 
                }
            }
        }
        Commit | CompactedCommit { .. } | Unknown => {} // invalid file type => do nothing
    } 
}
checkpoints
    .into_iter()
    .find(|(num_parts, part_files)| part_files.len() == num_parts)
    .map_or(vec![], |(_, part_files)| part_files)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this reminds me to use match statements to their full power in the future. Thx for the example Ryan!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yes, did not consider the multiple incomplete checkpoints. I'll introduce tests to cover some of these scenarios. And thanks a lot for the example!

}
}
// TODO: Include UuidCheckpoint once we actually support v2 checkpoints
_ => {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think uuidcheckpoint should return false, since we can't read that checkpoint. In general, beware catchall cases in match statements

I also wonder if we should panic/error if we ever get a commit file here, since that should not be happening.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, good catch. I think returning an Error in the catchall case would be a good idea as we really should not get anything other than LogPathFileType::SinglePartCheckpoint or LogPathFileType::MultiPartCheckpoint here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say handle the Uuid case separately from the catchall. Leave a comment that says this case will be supported in CheckpointV2.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+10 avoid catchall in match statements. Better to enumerate the known-invalid cases, so that when a new case shows up the compiler forces us to categorize it.

@sebastiantia sebastiantia changed the title [WIP] fix: incomplete multi-part checkpoint handling when no hint is provided fix: incomplete multi-part checkpoint handling when no hint is provided Jan 14, 2025
@sebastiantia sebastiantia marked this pull request as ready for review January 14, 2025 18:14
Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flushing comments (possibly stale -- a newer version was pushed during review)

.filter_map(|res| match res {
Ok(path) => Some(path),
Err(e) => {
warn!("Error processing path: {:?}", e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: We only need to keep commit files after (**) the latest complete checkpoint, so we should probably commit_files.clear() whenever we find a complete checkpoint.

(**) Delta spark also keeps the commit that corresponds to the checkpoint, in order to assign the file's timestamp as the snapshot timestamp. However:

  1. All actual timestamp-based time travel is resolved to an actual version, by a different code path called the Delta history manager, before requesting the snapshot by that version.
  2. The file timestamp is not always the snapshot timestamp, because clock skew could mean an earlier commit has a later timestamp. The Delta history manager includes logic to "adjust" the timestamp forward as needed in such cases, to ensure monotonicity.
  3. When in-commit timestamps are enabled, the file timestamp isn't even meaningful. Delta history manager also has logic to handle that case.

Based on all of the above, the snapshot may not even need a timestamp value. Even if it does, the value should be the adjusted (or ICT) value from a history manager component (not yet existing in kernel) -- not the "raw" timestamp from the file.

.filter_map(|res| match res {
Ok(path) => Some(path),
Err(e) => {
warn!("Error processing path: {:?}", e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: With ordered listing, checkpoint files always come before commit files. So in theory, we know whether we have a complete checkpoint by the time we encounter the commit file of a given version. Not sure if that allows simpler code or not tho.


match checkpoint_parts.last().map(|file| &file.file_type) {
Some(LogPathFileType::MultiPartCheckpoint { num_parts, .. }) => {
if *num_parts as usize != checkpoint_parts.len() {
Copy link
Collaborator

@scovich scovich Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's actually 1..=n. And yes, we should check. Also have to be careful because technically there could be two incomplete checkpoints with different num_parts for the same version. Also, we MUST accept at most one checkpoint -- even if multiple complete checkpoints are available -- so this function needs to filter, not just check.

Unfortunately, the poorly-chosen naming convention for multi-part checkpoint files means they interleave:

00000000000000000010.checkpoint.0000000001.0000000003.parquet
00000000000000000010.checkpoint.0000000001.0000000004.parquet
00000000000000000010.checkpoint.0000000002.0000000003.parquet
00000000000000000010.checkpoint.0000000002.0000000004.parquet
00000000000000000010.checkpoint.0000000003.0000000003.parquet
00000000000000000010.checkpoint.0000000003.0000000004.parquet
00000000000000000010.checkpoint.0000000004.0000000004.parquet

... which makes it a lot harder to identify the files of a given checkpoint and also means we can't just return a subslice in case there were multiple checkpoints to choose from.

We'd probably need to build a hash map keyed by number of parts:

let mut checkpoints = HashMap::new();
for part_file in checkpoint_parts {
    use LogPathFileType::*;
    match &file.file_type {
        SinglePartCheckpoint 
            | UuidCheckpoint(_) 
            | MultiPartCheckpoint { part_num: 1, num_parts: 1 } => 
        {
           // All single-file checkpoints are equivalent, just keep one
           checkpoints.insert(1, vec![part_file]);
        }
        MultiPartCheckpoint { part_num: 1, num_parts } => {
            // Start a new multi-part checkpoint with at least 2 parts
            checkpoints.insert(num_parts, vec![part_file]);
        }
        MultiPartCheckpoint { part_num, num_parts } => {
            // Continue a new multi-part checkpoint with at least 2 parts
            if let Some(part_files) = checkpoints.get_mut(num_parts) {
                if part_num == 1 + part_files.size() {
                    // Safe to append because all previous parts exist
                    part_files.append(part_file); 
                }
            }
        }
        Commit | CompactedCommit { .. } | Unknown => {} // invalid file type => do nothing
    } 
}
checkpoints
    .into_iter()
    .find(|(num_parts, part_files)| part_files.len() == num_parts)
    .map_or(vec![], |(_, part_files)| part_files)

@sebastiantia
Copy link
Collaborator Author

Flushing comments (possibly stale -- a newer version was pushed during review)

Thanks for this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Log file listing should handle incomplete multi-part checkpoints
5 participants