Skip to content

Commit 9f5671c

Browse files
authored
feat: add CheckpointLogReplayProcessor in new checkpoints mod (#744)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-incubator/delta-kernel-rs/blob/main/CONTRIBUTING.md 2. Run `cargo t --all-features --all-targets` to get started testing, and run `cargo fmt`. 3. Ensure you have added or run the appropriate tests for your PR. 4. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 5. Be sure to keep the PR description updated to reflect all changes. --> <!-- PR title formatting: This project uses conventional commits: https://www.conventionalcommits.org/ Each PR corresponds to a commit on the `main` branch, with the title of the PR (typically) being used for the commit message on main. In order to ensure proper formatting in the CHANGELOG please ensure your PR title adheres to the conventional commit specification. Examples: - new feature PR: "feat: new API for snapshot.update()" - bugfix PR: "fix: correctly apply DV in read-table example" --> ## What changes are proposed in this pull request? <!-- Please clarify what changes you are proposing and why the changes are needed. The purpose of this section is to outline the changes, why they are needed, and how this PR fixes the issue. If the reason for the change is already explained clearly in an issue, then it does not need to be restated here. 1. If you propose a new API or feature, clarify the use case for a new API or feature. 2. If you fix a bug, you can clarify why it is a bug. --> resolves #743 When the checkpoint API is called, we need to return all actions to be included in the checkpoint file for the engine to write. The returned actions will be `EngineData` batches, where each batch has an accompanying selection vector which informs the engine of which actions in the batch to write/not write to the checkpoint file. To generate this filtered actions iterator: This PR introduces the **`CheckpointLogReplayProcessor`** which implements the **`LogReplayProcessor`** trait. This processor is applied to an iterator of action batches read from the log segment with the new **`checkpoint_actions_iter`** method, in order to transform the input actions iterator into an iterator of **`FilteredEngineData`**, which includes the log data accompanied with a selection vector indicating which rows should be included in the checkpoint file. <!-- Uncomment this section if there are any changes affecting public APIs: ### This PR affects the following public APIs If there are breaking changes, please ensure the `breaking-changes` label gets added by CI, and describe why the changes are needed. Note that _new_ public APIs are not considered breaking. --> ## How was this change tested? <!-- Please make sure to add test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested, ideally via a reproducible test documented in the PR description. --> These tests test the application of the `CheckpointVisitor` over multiple batches. The visiting of individual batches is already tested. `test_checkpoint_actions_iter_non_file_actions` `test_checkpoint_actions_iter_file_actions` `test_checkpoint_actions_iter_file_actions_with_deletion_vectors`
1 parent a2980ed commit 9f5671c

File tree

6 files changed

+348
-68
lines changed

6 files changed

+348
-68
lines changed

kernel/src/checkpoint/log_replay.rs

Lines changed: 269 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,124 @@
2626
//! 1. Creates a visitor with the current deduplication state
2727
//! 2. Applies the visitor to filter actions in the batch
2828
//! 3. Updates counters and state for cross-batch deduplication
29-
//! 4. Produces a [`CheckpointData`] result which includes a selection vector indicating which
29+
//! 4. Produces a [`FilteredEngineData`] result which includes a selection vector indicating which
3030
//! actions should be included in the checkpoint file
31-
use std::collections::HashSet;
32-
use std::sync::LazyLock;
33-
34-
use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
35-
use crate::log_replay::{FileActionDeduplicator, FileActionKey};
31+
//!
32+
//! [`CheckpointMetadata`]: crate::actions::CheckpointMetadata
33+
use crate::engine_data::{FilteredEngineData, GetData, RowVisitor, TypedGetData as _};
34+
use crate::log_replay::{FileActionDeduplicator, FileActionKey, LogReplayProcessor};
35+
use crate::scan::data_skipping::DataSkippingFilter;
3636
use crate::schema::{column_name, ColumnName, ColumnNamesAndTypes, DataType};
3737
use crate::utils::require;
38-
use crate::{DeltaResult, Error};
38+
use crate::{DeltaResult, EngineData, Error};
39+
40+
use std::collections::HashSet;
41+
use std::sync::atomic::{AtomicI64, Ordering};
42+
use std::sync::{Arc, LazyLock};
43+
44+
/// The [`CheckpointLogReplayProcessor`] is an implementation of the [`LogReplayProcessor`]
45+
/// trait that filters log segment actions for inclusion in a V1 spec checkpoint file. This
46+
/// processor is leveraged when creating a single-file V2 checkpoint as the V2 spec schema is
47+
/// a superset of the V1 spec schema, with the addition of a [`CheckpointMetadata`] action.
48+
///
49+
/// It processes each action batch via the `process_actions_batch` method, using the
50+
/// [`CheckpointVisitor`] to build an accompanying selection vector indicating which actions
51+
/// should be included in the checkpoint.
52+
#[allow(unused)] // TODO(seb): Remove once checkpoint api is implemented
53+
pub(crate) struct CheckpointLogReplayProcessor {
54+
/// Tracks file actions that have been seen during log replay to avoid duplicates.
55+
/// Contains (data file path, dv_unique_id) pairs as `FileActionKey` instances.
56+
seen_file_keys: HashSet<FileActionKey>,
57+
// Arc<AtomicI64> provides shared mutability for our counters, allowing both the
58+
// iterator to update the values during processing and the caller to observe the final
59+
// counts afterward. The counters are i64 to match the `_last_checkpoint` file schema.
60+
// Tracks the total number of actions included in the checkpoint file.
61+
actions_count: Arc<AtomicI64>,
62+
// Tracks the total number of add actions included in the checkpoint file.
63+
add_actions_count: Arc<AtomicI64>,
64+
/// Indicates whether a protocol action has been seen in the log.
65+
seen_protocol: bool,
66+
/// Indicates whether a metadata action has been seen in the log.
67+
seen_metadata: bool,
68+
/// Set of transaction app IDs that have been processed to avoid duplicates.
69+
seen_txns: HashSet<String>,
70+
/// Minimum timestamp for file retention, used for filtering expired tombstones.
71+
minimum_file_retention_timestamp: i64,
72+
}
73+
74+
impl LogReplayProcessor for CheckpointLogReplayProcessor {
75+
type Output = FilteredEngineData;
76+
77+
/// Processes a batch of actions read from the log during reverse chronological replay
78+
/// and returns a filtered batch ([`FilteredEngineData`]) to be included in the checkpoint.
79+
///
80+
/// This method delegates the filtering logic to the [`CheckpointVisitor`], which implements
81+
/// the deduplication rules described in the module documentation. The method tracks
82+
/// statistics about processed actions (total count, add actions count) and maintains
83+
/// state for cross-batch deduplication.
84+
fn process_actions_batch(
85+
&mut self,
86+
batch: Box<dyn EngineData>,
87+
is_log_batch: bool,
88+
) -> DeltaResult<Self::Output> {
89+
let selection_vector = vec![true; batch.len()];
90+
91+
// Create the checkpoint visitor to process actions and update selection vector
92+
let mut visitor = CheckpointVisitor::new(
93+
&mut self.seen_file_keys,
94+
is_log_batch,
95+
selection_vector,
96+
self.minimum_file_retention_timestamp,
97+
self.seen_protocol,
98+
self.seen_metadata,
99+
&mut self.seen_txns,
100+
);
101+
visitor.visit_rows_of(batch.as_ref())?;
102+
103+
// Update the total actions and add actions counters. Relaxed ordering is sufficient
104+
// here as we only care about the total count when writing the _last_checkpoint file.
105+
// (the ordering is not important for correctness)
106+
self.actions_count.fetch_add(
107+
visitor.file_actions_count + visitor.non_file_actions_count,
108+
Ordering::Relaxed,
109+
);
110+
self.add_actions_count
111+
.fetch_add(visitor.add_actions_count, Ordering::Relaxed);
112+
113+
// Update protocol and metadata seen flags
114+
self.seen_protocol = visitor.seen_protocol;
115+
self.seen_metadata = visitor.seen_metadata;
116+
117+
Ok(FilteredEngineData {
118+
data: batch,
119+
selection_vector: visitor.selection_vector,
120+
})
121+
}
122+
123+
/// We never do data skipping for checkpoint log replay (entire table state is always reproduced)
124+
fn data_skipping_filter(&self) -> Option<&DataSkippingFilter> {
125+
None
126+
}
127+
}
128+
129+
impl CheckpointLogReplayProcessor {
130+
#[allow(unused)] // TODO(seb): Remove once checkpoint api is implemented
131+
pub(crate) fn new(
132+
actions_count: Arc<AtomicI64>,
133+
add_actions_count: Arc<AtomicI64>,
134+
minimum_file_retention_timestamp: i64,
135+
) -> Self {
136+
Self {
137+
seen_file_keys: Default::default(),
138+
actions_count,
139+
add_actions_count,
140+
seen_protocol: false,
141+
seen_metadata: false,
142+
seen_txns: Default::default(),
143+
minimum_file_retention_timestamp,
144+
}
145+
}
146+
}
39147

40148
/// A visitor that filters actions for inclusion in a V1 spec checkpoint file.
41149
///
@@ -137,13 +245,13 @@ impl CheckpointVisitor<'_> {
137245
Self::REMOVE_DV_START_INDEX,
138246
),
139247
selection_vector,
248+
non_file_actions_count: 0,
140249
file_actions_count: 0,
141250
add_actions_count: 0,
142251
minimum_file_retention_timestamp,
143252
seen_protocol,
144253
seen_metadata,
145254
seen_txns,
146-
non_file_actions_count: 0,
147255
}
148256
}
149257

@@ -355,12 +463,37 @@ impl RowVisitor for CheckpointVisitor<'_> {
355463

356464
#[cfg(test)]
357465
mod tests {
358-
use std::collections::HashSet;
359-
466+
use super::*;
360467
use crate::arrow::array::StringArray;
361468
use crate::utils::test_utils::{action_batch, parse_json_batch};
469+
use std::collections::HashSet;
362470

363-
use super::*;
471+
/// Helper function to create test batches from JSON strings
472+
fn create_batch(json_strings: Vec<&str>) -> DeltaResult<(Box<dyn EngineData>, bool)> {
473+
Ok((parse_json_batch(StringArray::from(json_strings)), true))
474+
}
475+
476+
/// Helper function which applies the [`CheckpointLogReplayProcessor`] to a set of
477+
/// input batches and returns the results.
478+
fn run_checkpoint_test(
479+
input_batches: Vec<(Box<dyn EngineData>, bool)>,
480+
) -> DeltaResult<(Vec<FilteredEngineData>, i64, i64)> {
481+
let actions_count = Arc::new(AtomicI64::new(0));
482+
let add_actions_count = Arc::new(AtomicI64::new(0));
483+
let results: Vec<_> = CheckpointLogReplayProcessor::new(
484+
actions_count.clone(),
485+
add_actions_count.clone(),
486+
0, // minimum_file_retention_timestamp
487+
)
488+
.process_actions_iter(input_batches.into_iter().map(Ok))
489+
.collect::<DeltaResult<Vec<_>>>()?;
490+
491+
Ok((
492+
results,
493+
actions_count.load(Ordering::Relaxed),
494+
add_actions_count.load(Ordering::Relaxed),
495+
))
496+
}
364497

365498
#[test]
366499
fn test_checkpoint_visitor() -> DeltaResult<()> {
@@ -370,7 +503,7 @@ mod tests {
370503
let mut visitor = CheckpointVisitor::new(
371504
&mut seen_file_keys,
372505
true,
373-
vec![true; 8],
506+
vec![true; 9],
374507
0, // minimum_file_retention_timestamp (no expired tombstones)
375508
false,
376509
false,
@@ -388,6 +521,7 @@ mod tests {
388521
false, // Row 5 is a cdc action (excluded)
389522
false, // Row 6 is a sidecar action (excluded)
390523
true, // Row 7 is a txn action (included)
524+
false, // Row 8 is a checkpointMetadata action (excluded)
391525
];
392526

393527
assert_eq!(visitor.file_actions_count, 2);
@@ -477,14 +611,14 @@ mod tests {
477611
}
478612

479613
#[test]
480-
fn test_checkpoint_visitor_conflicts_with_deletion_vectors() -> DeltaResult<()> {
614+
fn test_checkpoint_visitor_file_actions_with_deletion_vectors() -> DeltaResult<()> {
481615
let json_strings: StringArray = vec![
482616
// Add action for file1 with deletion vector
483-
r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"two","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
484-
// Remove action for file1 with a different deletion vector
485-
r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"one","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
486-
// Add action for file1 with the same deletion vector as the remove action above (excluded)
487-
r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"one","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
617+
r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"ONE","pathOrInlineDv":"dv1","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
618+
// Remove action for file1 with a different deletion vector
619+
r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"TWO","pathOrInlineDv":"dv2","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
620+
// Remove action for file1 with another different deletion vector
621+
r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"THREE","pathOrInlineDv":"dv3","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
488622
]
489623
.into();
490624
let batch = parse_json_batch(json_strings);
@@ -503,9 +637,9 @@ mod tests {
503637

504638
visitor.visit_rows_of(batch.as_ref())?;
505639

506-
let expected = vec![true, true, false];
640+
let expected = vec![true, true, true];
507641
assert_eq!(visitor.selection_vector, expected);
508-
assert_eq!(visitor.file_actions_count, 2);
642+
assert_eq!(visitor.file_actions_count, 3);
509643
assert_eq!(visitor.add_actions_count, 1);
510644
assert_eq!(visitor.non_file_actions_count, 0);
511645

@@ -585,4 +719,119 @@ mod tests {
585719

586720
Ok(())
587721
}
722+
723+
/// This test ensures that the processor correctly deduplicates and filters
724+
/// non-file actions (metadata, protocol, txn) across multiple batches.
725+
#[test]
726+
fn test_checkpoint_actions_iter_non_file_actions() -> DeltaResult<()> {
727+
// Batch 1: protocol, metadata, and txn actions
728+
let batch1 = vec![
729+
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#,
730+
r#"{"metaData":{"id":"test1","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
731+
r#"{"txn":{"appId":"app1","version":1,"lastUpdated":123456789}}"#,
732+
];
733+
734+
// Batch 2: duplicate actions, and a new txn action
735+
let batch2 = vec![
736+
// Duplicates that should be skipped
737+
r#"{"protocol":{"minReaderVersion":2,"minWriterVersion":3}}"#,
738+
r#"{"metaData":{"id":"test2","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
739+
r#"{"txn":{"appId":"app1","version":1,"lastUpdated":123456789}}"#,
740+
// Unique transaction (appId) should be included
741+
r#"{"txn":{"appId":"app2","version":1,"lastUpdated":123456789}}"#,
742+
];
743+
744+
// Batch 3: a duplicate action (entire batch should be skipped)
745+
let batch3 = vec![r#"{"protocol":{"minReaderVersion":2,"minWriterVersion":3}}"#];
746+
747+
let input_batches = vec![
748+
create_batch(batch1)?,
749+
create_batch(batch2)?,
750+
create_batch(batch3)?,
751+
];
752+
let (results, actions_count, add_actions) = run_checkpoint_test(input_batches)?;
753+
754+
// Verify results
755+
assert_eq!(results.len(), 2, "Expected two batches in results");
756+
assert_eq!(results[0].selection_vector, vec![true, true, true],);
757+
assert_eq!(results[1].selection_vector, vec![false, false, false, true],);
758+
assert_eq!(actions_count, 4);
759+
assert_eq!(add_actions, 0);
760+
761+
Ok(())
762+
}
763+
764+
/// This test ensures that the processor correctly deduplicates and filters
765+
/// file actions (add, remove) across multiple batches.
766+
#[test]
767+
fn test_checkpoint_actions_iter_file_actions() -> DeltaResult<()> {
768+
// Batch 1: add action (file1) - new, should be included
769+
let batch1 = vec![
770+
r#"{"add":{"path":"file1","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true}}"#,
771+
];
772+
773+
// Batch 2: remove actions - mixed inclusion
774+
let batch2 = vec![
775+
// Already seen file, should be excluded
776+
r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"partitionValues":{}}}"#,
777+
// New file, should be included
778+
r#"{"remove":{"path":"file2","deletionTimestamp":100,"dataChange":true,"partitionValues":{}}}"#,
779+
];
780+
781+
// Batch 3: add action (file2) - already seen, should be excluded
782+
let batch3 = vec![
783+
r#"{"add":{"path":"file2","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true}}"#,
784+
];
785+
786+
let input_batches = vec![
787+
create_batch(batch1)?,
788+
create_batch(batch2)?,
789+
create_batch(batch3)?,
790+
];
791+
let (results, actions_count, add_actions) = run_checkpoint_test(input_batches)?;
792+
793+
// Verify results
794+
assert_eq!(results.len(), 2); // The third batch should be filtered out since there are no selected actions
795+
assert_eq!(results[0].selection_vector, vec![true]);
796+
assert_eq!(results[1].selection_vector, vec![false, true]);
797+
assert_eq!(actions_count, 2);
798+
assert_eq!(add_actions, 1);
799+
800+
Ok(())
801+
}
802+
803+
/// This test ensures that the processor correctly deduplicates and filters
804+
/// file actions (add, remove) with deletion vectors across multiple batches.
805+
#[test]
806+
fn test_checkpoint_actions_iter_file_actions_with_deletion_vectors() -> DeltaResult<()> {
807+
// Batch 1: add actions with deletion vectors
808+
let batch1 = vec![
809+
// (file1, DV_ONE) New, should be included
810+
r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"ONE","pathOrInlineDv":"dv1","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
811+
// (file1, DV_TWO) New, should be included
812+
r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"TWO","pathOrInlineDv":"dv2","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
813+
];
814+
815+
// Batch 2: mixed actions with duplicate and new entries
816+
let batch2 = vec![
817+
// (file1, DV_ONE): Already seen, should be excluded
818+
r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"ONE","pathOrInlineDv":"dv1","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
819+
// (file1, DV_TWO): Already seen, should be excluded
820+
r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"TWO","pathOrInlineDv":"dv2","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
821+
// New file, should be included
822+
r#"{"remove":{"path":"file2","deletionTimestamp":100,"dataChange":true,"partitionValues":{}}}"#,
823+
];
824+
825+
let input_batches = vec![create_batch(batch1)?, create_batch(batch2)?];
826+
let (results, actions_count, add_actions) = run_checkpoint_test(input_batches)?;
827+
828+
// Verify results
829+
assert_eq!(results.len(), 2);
830+
assert_eq!(results[0].selection_vector, vec![true, true]);
831+
assert_eq!(results[1].selection_vector, vec![false, false, true]);
832+
assert_eq!(actions_count, 3);
833+
assert_eq!(add_actions, 2);
834+
835+
Ok(())
836+
}
588837
}

kernel/src/checkpoint/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,4 @@
55
//! avoiding full log replay. This API supports three checkpoint types:
66
//!
77
//! TODO!(seb): Include docs when implemented
8-
mod log_replay;
8+
pub(crate) mod log_replay;

kernel/src/engine_data.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! Traits that engines need to implement in order to pass data between themselves and kernel.
22
3+
use crate::log_replay::HasSelectionVector;
34
use crate::schema::{ColumnName, DataType};
45
use crate::{AsAny, DeltaResult, Error};
56

@@ -20,6 +21,13 @@ pub struct FilteredEngineData {
2021
pub selection_vector: Vec<bool>,
2122
}
2223

24+
impl HasSelectionVector for FilteredEngineData {
25+
/// Returns true if any row in the selection vector is marked as selected
26+
fn has_selected_rows(&self) -> bool {
27+
self.selection_vector.contains(&true)
28+
}
29+
}
30+
2331
/// a trait that an engine exposes to give access to a list
2432
pub trait EngineList {
2533
/// Return the length of the list at the specified row_index in the raw data

0 commit comments

Comments
 (0)