Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Table::checkpoint() API #797

Open
wants to merge 146 commits into
base: main
Choose a base branch
from

Conversation

sebastiantia
Copy link
Collaborator

@sebastiantia sebastiantia commented Apr 2, 2025

What changes are proposed in this pull request?

Please ignore the checkpoint/log_replay mod (change in stacked PR #774)

This PR implements the checkpoint API, enabling table checkpoints that provide compact summaries of table state for faster recovery without full log replay.

Key Features

  • Adds a Table::checkpoint() method that automatically selects the appropriate checkpoint type based on table feature support:

    • Tables with v2Checkpoints feature → Creates a Single-file Classic-named V2 Checkpoint
    • Tables without v2Checkpoints feature → Creates a Single-file Classic-named V1 Checkpoint
  • Implements the CheckpointWriter.

    • Handles the generation of checkpoint data & path on call to .checkpoint_data()
    • Handles the finalization of the checkpointing process by writing the _last_checkpoint file on call to.finalize().
    • Note: we require the engine to write the entire checkpoint file to storage before calling .finalize(), otherwise the table may be corrupted.

API Overview

The checkpoint workflow for the engine consists of:

  1. Creating a CheckpointWriter via Table::checkpoint(engine, version)
  2. Retrieving checkpoint data with CheckpointWriter::get_data()
  3. Writing the data to storage (implementation-specific)
  4. Finalizing the checkpoint by calling CheckpointWriter.finalize()

This PR is stacked on #774.

Please only review these commits.

How was this change tested?

Unit tests in checkpoint/mod.rs
test_deleted_file_retention_timestamp - tests file retention timestamp calculations
test_create_checkpoint_metadata_batch_when_v2_checkpoints_is_supported
test_create_checkpoint_metadata_batch_when_v2_checkpoints_not_supported
test_create_last_checkpoint_metadata
test_create_last_checkpoint_metadata_with_invalid_batch

'Integration tests' in checkpoint/tests.rs
test_v1_checkpoint_latest_version_by_default: table that does not support v2Checkpoint, no checkpoint version specified
test_v1_checkpoint_specific_version: table that does not support v2Checkpoint, checkpointing at a specific version
test_v2_checkpoint_supported_table: table that supports v2Checkpoint & no version is specified
test_checkpoint_error_handling_invalid_version: checkpoint with a version that does not exist in the log

@sebastiantia sebastiantia marked this pull request as ready for review April 8, 2025 20:10
Copy link
Collaborator

@zachschuermann zachschuermann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few comments to start

&mut self,
engine: &dyn Engine,
) -> DeltaResult<SingleFileCheckpointData> {
if self.data_consumed {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use types to manage this 'state transition' if needed

Copy link
Collaborator Author

@sebastiantia sebastiantia Apr 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking a step back, do we want to limit the engine from calling .checkpoint_data() multiple times to retrieve multiple copies of the checkpoint data & path in the first place? If they really wanted to, they could always just call table.checkpoint() again so I don't see a strong reason to?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is stacked on #744, do not review this file (added in stacked pr)

@sebastiantia sebastiantia removed the breaking-change Change that will require a version bump label Apr 11, 2025
@sebastiantia
Copy link
Collaborator Author

I can separate out the finalize() API to a separate PR if reviewers think this PR is too bloated, lmk!

//!
//! ## Example: Writing a classic-named V1/V2 checkpoint (depending on `v2Checkpoints` feature support)
//!
//! TODO(seb): unignore example
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that I like to do is to use FIXME for stuff I want to fix this PR, and TODO for stuff that I wanna fix in a subsequent issue/PR.

Comment on lines +63 to +67
//! - Single-file UUID-named V2 checkpoints (using `n.checkpoint.u.{json/parquet}` naming) are to be
//! implemented in the future. The current implementation only supports classic-named V2 checkpoints.
//! - Multi-file V2 checkpoints are not supported yet. The API is designed to be extensible for future
//! multi-file support, but the current implementation only supports single-file checkpoints.
//!
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see github issues and TODOs associated with these so that they're tracked. Something like:

  • TODO(433): Support Single-file

@zachschuermann This is a pattern I've seen in other repositories, and I'd advocate for this to be the way we track todos.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, great idea! let's try to use that issue numbering i agree that's nice

regarding issues/etc. how about we update the original 'checkpoint support' issue with some clearly documented bits on what was done and what is future work (and can always make the future work into sub-issues, etc.)

i think original issues were #736 and #499 - how about we consolidate (feel free to just close one or both with comments on whatever we select or new one we make

//! let checkpoint_data = writer.checkpoint_data()?;
//!
//! // Write checkpoint data to storage (implementation-specific)
//! let metadata = your_storage_implementation.write_checkpoint(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not clear to me what your_storage_implementation is. For code docs like this, I'd like for them to be something that actually compiles

}

// The current checkpoint API only supports single-file checkpoints.
let parts: i64 = 1; // Coerce the type to `i64`` to match the expected schema.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let parts: i64 = 1; // Coerce the type to `i64`` to match the expected schema.
let parts = 1i64;

/// - `sizeInBytes` (i64, optional): Size of checkpoint file in bytes
/// - `numOfAddFiles` (i64, optional): Number of Add actions
///
/// Note: The fields `checkpointSchema` and `checksum` are not yet included in this
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you link the todos here?

TODO(xxx): Add checkpointSchema field to _last_checkpoint
TODO(xxx): Add checksum field to _last_checkpoint

Comment on lines +416 to +422
let last_checkpoint_schema = Arc::new(StructType::new([
StructField::not_null("version", DataType::LONG),
StructField::not_null("size", DataType::LONG),
StructField::nullable("parts", DataType::LONG),
StructField::nullable("sizeInBytes", DataType::LONG),
StructField::nullable("numOfAddFiles", DataType::LONG),
]));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Declare this somewhere with a static lazy lock, then clone the arc.

Comment on lines +238 to +239
self.total_actions_counter.load(Ordering::Relaxed),
self.add_actions_counter.load(Ordering::Relaxed),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be wary of using Ordering::Relaxed without being absolutely certain things won't go wrong.

Check out the rustinomicon.

I'd advise to keep all the counter accesses/updates as SeqCst. We're not doing a lot of load/stores. Especially when you compare to the huge amount of data we'd be processing, so it's not a big performance hit.

Comment on lines +288 to +291
let schema = Arc::new(StructType::new([StructField::not_null(
CHECKPOINT_METADATA_NAME,
DataType::struct_type([StructField::not_null("version", DataType::LONG)]),
)]));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another opportunity for static LazyLock + clone.

let now_ms: i64 = now_duration
.as_millis()
.try_into()
.map_err(|_| Error::generic("Current timestamp exceeds i64 millisecond range"))?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create an error type for the checkpoint writer. The use of error::generic in our code makes it harder to test these error cases, and it makes debugging really challenging.

Suppose you run a kernel connector, and just gets back "Current timestamp exceeds i64 millisecond range". Where do they even begin to look for the bug?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants