Skip to content

Commit 59d3b03

Browse files
authored
feat: add Snapshot::checkpoint() & Table::checkpoint() API (#797)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-incubator/delta-kernel-rs/blob/main/CONTRIBUTING.md 2. Run `cargo t --all-features --all-targets` to get started testing, and run `cargo fmt`. 3. Ensure you have added or run the appropriate tests for your PR. 4. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 5. Be sure to keep the PR description updated to reflect all changes. --> <!-- PR title formatting: This project uses conventional commits: https://www.conventionalcommits.org/ Each PR corresponds to a commit on the `main` branch, with the title of the PR (typically) being used for the commit message on main. In order to ensure proper formatting in the CHANGELOG please ensure your PR title adheres to the conventional commit specification. Examples: - new feature PR: "feat: new API for snapshot.update()" - bugfix PR: "fix: correctly apply DV in read-table example" --> ## What changes are proposed in this pull request? <!-- Please clarify what changes you are proposing and why the changes are needed. The purpose of this section is to outline the changes, why they are needed, and how this PR fixes the issue. If the reason for the change is already explained clearly in an issue, then it does not need to be restated here. 1. If you propose a new API or feature, clarify the use case for a new API or feature. 2. If you fix a bug, you can clarify why it is a bug. --> ### Motivation and Context This is part of the parent-issue to provide checkpoint write support in `delta-kernel-rs` #499. This PR introduces a new snapshot API for writing **single-file checkpoints** to Delta tables (#736), supporting single-file V1 and V2 spec classic-named checkpoint formats. ### Breaking Changes - Introduces a new error variant that may require handling in downstream code. ### Summary of Changes - **New Checkpoint API**: Adds `Snapshot::checkpoint` & `Table::checkpoint` as the entry points for checkpoint creation, supporting both V1 and V2 checkpoint specs depending on table features. - **CheckpointWriter**: Introduces a core orchestrator for checkpoint creation, including data preparation and (future) finalization - todo(#850) - **CheckpointDataIterator**: An iterator-based mechanism for checkpoint data generation, ensuring accurate action statistics and safe finalization. - **Explicit Finalization**: Lays groundwork for a two-step checkpoint process. First write all data, then call `.finalize()` to persist checkpoint metadata - todo(#850). - **Error Handling**: Adds a new error variant for checkpoint write failures. - **Extensibility**: The API is designed to accommodate future enhancements, ( multi-file V2 checkpoints ) ### Major Components and APIs | API / Struct | Description | |--------------------------------------|-------------------------------------------------------------------------------------------------------| | `Error::CheckpointWrite(String)` | New error variant for checkpoint write failures | | `Snapshot::checkpoint()` | Creates a new `CheckpointWriter` for a snapshot | | `Table::checkpoint()` | Creates a new `CheckpointWriter` for a version of a table | | `CheckpointWriter` | Main class orchestrating checkpoint creation and finalization | | `CheckpointWriter::checkpoint_path()`| Returns the URL where the checkpoint file should be written | | `CheckpointWriter::checkpoint_data()`| Returns the checkpoint data (`CheckpointDataIterator`) to be written to the checkpoint file | | `CheckpointWriter::finalize()` | todo(#850) Finalizes checkpoint by writing the `_last_checkpoint` file after data is persisted | | `CheckpointDataIterator` | Iterator over checkpoint actions, accumulates statistics for finalization | | `CheckpointBatch` (private) | Output of `CheckpointLogReplayProcessor`, contains filtered actions and counts | ### Checkpoint Types | Table Feature | Resulting Checkpoint Type | |------------------|------------------------------| | No v2Checkpoints | Single-file Classic-named V1 | | v2Checkpoints | Single-file Classic-named V2 | - **V1**: For legacy tables, no `CheckpointMetadata` action included. - **V2**: For tables supporting `v2Checkpoints`, includes `CheckpointMetadata` action for enhanced metadata. ### Usage Workflow 1. Create a `CheckpointWriter` via `Snapshot::checkpoint` or `Table::checkpoint` 2. Retrieve the checkpoint path from `CheckpointWriter::checkpoint_path()` 3. Retrieve the checkpoint data from `CheckpointWriter::checkpoint_data()` 4. Write the data to path in object storage (engine-specific) 5. Finalize the checkpoint by calling `CheckpointWriter.finalize()` - todo(#850) todo(#850): The `CheckpointWriter::finalize()` API that was previously included in this PR has been split into a separate PR #851 for ease of review. Handle the finalization of the checkpointing process by writing the `_last_checkpoint` file on call to`.finalize()`. Note: we require the engine to write the entire checkpoint file to storage before calling `.finalize()`, otherwise the table may be corrupted. It will be hard for the engine **not** to do this since the `finalize()` call takes the `FileMeta` of the checkpoint write <!-- Uncomment this section if there are any changes affecting public APIs: ### This PR affects the following public APIs If there are breaking changes, please ensure the `breaking-changes` label gets added by CI, and describe why the changes are needed. Note that _new_ public APIs are not considered breaking. --> ## How was this change tested? <!-- Please make sure to add test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested, ideally via a reproducible test documented in the PR description. --> Unit tests in `checkpoint/mod.rs` `test_deleted_file_retention_timestamp` - tests file retention timestamp calculations `test_create_checkpoint_metadata_batch` Unit tests in `checkpoint/tests.rs` `test_v1_checkpoint_latest_version_by_default`: table that does not support `v2Checkpoint`, no checkpoint version specified `test_v1_checkpoint_specific_version`: table that does not support `v2Checkpoint`, checkpointing at a specific version `test_v2_checkpoint_supported_table`: table that supports `v2Checkpoint` & no version is specified
1 parent 23e65d3 commit 59d3b03

File tree

9 files changed

+864
-64
lines changed

9 files changed

+864
-64
lines changed

ffi/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ pub enum KernelError {
5353
ChangeDataFeedIncompatibleSchema,
5454
InvalidCheckpoint,
5555
LiteralExpressionTransformError,
56+
CheckpointWriteError,
5657
}
5758

5859
impl From<Error> for KernelError {
@@ -61,6 +62,7 @@ impl From<Error> for KernelError {
6162
// NOTE: By definition, no kernel Error maps to FFIError
6263
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
6364
Error::Arrow(_) => KernelError::ArrowError,
65+
Error::CheckpointWrite(_) => KernelError::CheckpointWriteError,
6466
Error::EngineDataType(_) => KernelError::EngineDataTypeError,
6567
Error::Extract(..) => KernelError::ExtractError,
6668
Error::Generic(_) => KernelError::GenericError,

kernel/src/checkpoint/log_replay.rs

Lines changed: 52 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -25,42 +25,31 @@
2525
//! the overall process. For each batch of log actions, it:
2626
//! 1. Creates a visitor with the current deduplication state
2727
//! 2. Applies the visitor to filter actions in the batch
28-
//! 3. Updates counters and state for cross-batch deduplication
29-
//! 4. Produces a [`FilteredEngineData`] result which includes a selection vector indicating which
30-
//! actions should be included in the checkpoint file
28+
//! 3. Tracks state for deduplication across batches
29+
//! 4. Produces a [`CheckpointBatch`] result which includes both the filtered data and counts of
30+
//! actions selected for the checkpoint file
3131
//!
3232
//! [`CheckpointMetadata`]: crate::actions::CheckpointMetadata
3333
use crate::engine_data::{FilteredEngineData, GetData, RowVisitor, TypedGetData as _};
34-
use crate::log_replay::{FileActionDeduplicator, FileActionKey, LogReplayProcessor};
34+
use crate::log_replay::{
35+
FileActionDeduplicator, FileActionKey, HasSelectionVector, LogReplayProcessor,
36+
};
3537
use crate::scan::data_skipping::DataSkippingFilter;
3638
use crate::schema::{column_name, ColumnName, ColumnNamesAndTypes, DataType};
3739
use crate::utils::require;
3840
use crate::{DeltaResult, EngineData, Error};
3941

4042
use std::collections::HashSet;
41-
use std::sync::atomic::{AtomicI64, Ordering};
42-
use std::sync::{Arc, LazyLock};
43+
use std::sync::LazyLock;
4344

4445
/// The [`CheckpointLogReplayProcessor`] is an implementation of the [`LogReplayProcessor`]
4546
/// trait that filters log segment actions for inclusion in a V1 spec checkpoint file. This
4647
/// processor is leveraged when creating a single-file V2 checkpoint as the V2 spec schema is
4748
/// a superset of the V1 spec schema, with the addition of a [`CheckpointMetadata`] action.
48-
///
49-
/// It processes each action batch via the `process_actions_batch` method, using the
50-
/// [`CheckpointVisitor`] to build an accompanying selection vector indicating which actions
51-
/// should be included in the checkpoint.
52-
#[allow(unused)] // TODO(seb): Remove once checkpoint api is implemented
5349
pub(crate) struct CheckpointLogReplayProcessor {
5450
/// Tracks file actions that have been seen during log replay to avoid duplicates.
5551
/// Contains (data file path, dv_unique_id) pairs as `FileActionKey` instances.
5652
seen_file_keys: HashSet<FileActionKey>,
57-
// Arc<AtomicI64> provides shared mutability for our counters, allowing both the
58-
// iterator to update the values during processing and the caller to observe the final
59-
// counts afterward. The counters are i64 to match the `_last_checkpoint` file schema.
60-
// Tracks the total number of actions included in the checkpoint file.
61-
actions_count: Arc<AtomicI64>,
62-
// Tracks the total number of add actions included in the checkpoint file.
63-
add_actions_count: Arc<AtomicI64>,
6453
/// Indicates whether a protocol action has been seen in the log.
6554
seen_protocol: bool,
6655
/// Indicates whether a metadata action has been seen in the log.
@@ -71,11 +60,31 @@ pub(crate) struct CheckpointLogReplayProcessor {
7160
minimum_file_retention_timestamp: i64,
7261
}
7362

63+
/// This struct is the output of the [`CheckpointLogReplayProcessor`].
64+
///
65+
/// It contains the filtered batch of actions to be included in the checkpoint,
66+
/// along with statistics about the number of actions filtered for inclusion.
67+
pub(crate) struct CheckpointBatch {
68+
/// The filtered batch of actions to be included in the checkpoint.
69+
pub(crate) filtered_data: FilteredEngineData,
70+
/// The number of actions in the batch filtered for inclusion in the checkpoint.
71+
pub(crate) actions_count: i64,
72+
/// The number of add actions in the batch filtered for inclusion in the checkpoint.
73+
pub(crate) add_actions_count: i64,
74+
}
75+
76+
impl HasSelectionVector for CheckpointBatch {
77+
fn has_selected_rows(&self) -> bool {
78+
self.filtered_data.has_selected_rows()
79+
}
80+
}
81+
7482
impl LogReplayProcessor for CheckpointLogReplayProcessor {
75-
type Output = FilteredEngineData;
83+
type Output = CheckpointBatch;
7684

7785
/// Processes a batch of actions read from the log during reverse chronological replay
78-
/// and returns a filtered batch ([`FilteredEngineData`]) to be included in the checkpoint.
86+
/// and returns a [`CheckpointBatch`], which contains the filtered actions to be
87+
/// included in the checkpoint file, along with statistics about the included actions.
7988
///
8089
/// This method delegates the filtering logic to the [`CheckpointVisitor`], which implements
8190
/// the deduplication rules described in the module documentation. The method tracks
@@ -100,23 +109,19 @@ impl LogReplayProcessor for CheckpointLogReplayProcessor {
100109
);
101110
visitor.visit_rows_of(batch.as_ref())?;
102111

103-
// Update the total actions and add actions counters. Relaxed ordering is sufficient
104-
// here as we only care about the total count when writing the _last_checkpoint file.
105-
// (the ordering is not important for correctness)
106-
self.actions_count.fetch_add(
107-
visitor.file_actions_count + visitor.non_file_actions_count,
108-
Ordering::Relaxed,
109-
);
110-
self.add_actions_count
111-
.fetch_add(visitor.add_actions_count, Ordering::Relaxed);
112-
113112
// Update protocol and metadata seen flags
114113
self.seen_protocol = visitor.seen_protocol;
115114
self.seen_metadata = visitor.seen_metadata;
116115

117-
Ok(FilteredEngineData {
116+
let filtered_data = FilteredEngineData {
118117
data: batch,
119118
selection_vector: visitor.selection_vector,
119+
};
120+
121+
Ok(CheckpointBatch {
122+
filtered_data,
123+
actions_count: visitor.non_file_actions_count + visitor.file_actions_count,
124+
add_actions_count: visitor.add_actions_count,
120125
})
121126
}
122127

@@ -127,16 +132,9 @@ impl LogReplayProcessor for CheckpointLogReplayProcessor {
127132
}
128133

129134
impl CheckpointLogReplayProcessor {
130-
#[allow(unused)] // TODO(seb): Remove once checkpoint api is implemented
131-
pub(crate) fn new(
132-
actions_count: Arc<AtomicI64>,
133-
add_actions_count: Arc<AtomicI64>,
134-
minimum_file_retention_timestamp: i64,
135-
) -> Self {
135+
pub(crate) fn new(minimum_file_retention_timestamp: i64) -> Self {
136136
Self {
137137
seen_file_keys: Default::default(),
138-
actions_count,
139-
add_actions_count,
140138
seen_protocol: false,
141139
seen_metadata: false,
142140
seen_txns: Default::default(),
@@ -463,10 +461,13 @@ impl RowVisitor for CheckpointVisitor<'_> {
463461

464462
#[cfg(test)]
465463
mod tests {
464+
use std::collections::HashSet;
465+
466466
use super::*;
467467
use crate::arrow::array::StringArray;
468468
use crate::utils::test_utils::{action_batch, parse_json_batch};
469-
use std::collections::HashSet;
469+
470+
use itertools::Itertools;
470471

471472
/// Helper function to create test batches from JSON strings
472473
fn create_batch(json_strings: Vec<&str>) -> DeltaResult<(Box<dyn EngineData>, bool)> {
@@ -478,23 +479,18 @@ mod tests {
478479
fn run_checkpoint_test(
479480
input_batches: Vec<(Box<dyn EngineData>, bool)>,
480481
) -> DeltaResult<(Vec<FilteredEngineData>, i64, i64)> {
481-
let actions_count = Arc::new(AtomicI64::new(0));
482-
let add_actions_count = Arc::new(AtomicI64::new(0));
483-
let results: Vec<_> = CheckpointLogReplayProcessor::new(
484-
actions_count.clone(),
485-
add_actions_count.clone(),
486-
0, // minimum_file_retention_timestamp
487-
)
488-
.process_actions_iter(input_batches.into_iter().map(Ok))
489-
.collect::<DeltaResult<Vec<_>>>()?;
490-
491-
Ok((
492-
results,
493-
actions_count.load(Ordering::Relaxed),
494-
add_actions_count.load(Ordering::Relaxed),
495-
))
482+
let processed_batches: Vec<_> = CheckpointLogReplayProcessor::new(0)
483+
.process_actions_iter(input_batches.into_iter().map(Ok))
484+
.try_collect()?;
485+
let total_count: i64 = processed_batches.iter().map(|b| b.actions_count).sum();
486+
let add_count: i64 = processed_batches.iter().map(|b| b.add_actions_count).sum();
487+
let filtered_data = processed_batches
488+
.into_iter()
489+
.map(|b| b.filtered_data)
490+
.collect();
491+
492+
Ok((filtered_data, total_count, add_count))
496493
}
497-
498494
#[test]
499495
fn test_checkpoint_visitor() -> DeltaResult<()> {
500496
let data = action_batch();

0 commit comments

Comments
 (0)