Skip to content
Merged
Show file tree
Hide file tree
Changes from 121 commits
Commits
Show all changes
127 commits
Select commit Hold shift + click to select a range
435302e
introduce visitors
sebastiantia Mar 12, 2025
e500a10
remove pub
sebastiantia Mar 12, 2025
19733cd
assert! instead of assert_eq with bool
sebastiantia Mar 12, 2025
87c9f31
log replay for checkpoints
sebastiantia Mar 13, 2025
db5ccd0
rename & some clean up
sebastiantia Mar 13, 2025
42c08c1
remove new path for now
sebastiantia Mar 13, 2025
f91baeb
merge non file action visitor tests
sebastiantia Mar 22, 2025
9fdfba7
mvp for refactor
sebastiantia Mar 24, 2025
d420fd1
these github action checks clog my screen
sebastiantia Mar 24, 2025
9e0e048
base file actions struct
sebastiantia Mar 25, 2025
303444b
combine visitors
sebastiantia Mar 25, 2025
5dbc924
fmt
sebastiantia Mar 26, 2025
b793961
remove old code
sebastiantia Mar 26, 2025
508976f
move FileActionKey
sebastiantia Mar 26, 2025
bccaa17
Merge branch 'main' into checkpoint-visitors
sebastiantia Mar 26, 2025
a23d7cb
merge
sebastiantia Mar 26, 2025
0160ef1
fix whitespace
sebastiantia Mar 26, 2025
aae7046
remove old code
sebastiantia Mar 26, 2025
f574370
refactor more
sebastiantia Mar 26, 2025
a618833
refactor
sebastiantia Mar 26, 2025
7da74b2
more docs
sebastiantia Mar 26, 2025
220a216
invert is_log_batch logic
sebastiantia Mar 26, 2025
9d86911
docs
sebastiantia Mar 26, 2025
e5b0e32
docs
sebastiantia Mar 26, 2025
a5393dc
docs and imports
sebastiantia Mar 26, 2025
a23c651
improve mod doc
sebastiantia Mar 27, 2025
d712d18
improve doc
sebastiantia Mar 27, 2025
e564ae1
docs'
sebastiantia Mar 27, 2025
b14ff19
docs
sebastiantia Mar 27, 2025
a52d484
update
sebastiantia Mar 27, 2025
a243a98
nits
sebastiantia Mar 27, 2025
9f06382
Revert "nits"
sebastiantia Mar 28, 2025
58f38c0
nits
sebastiantia Mar 28, 2025
628546c
refactor
sebastiantia Mar 27, 2025
88cf983
move
sebastiantia Mar 27, 2025
10bb7b5
fix rebase
sebastiantia Mar 28, 2025
4b5a3a9
introduce visitors
sebastiantia Mar 12, 2025
1cb9364
assert! instead of assert_eq with bool
sebastiantia Mar 12, 2025
797a05c
merge non file action visitor tests
sebastiantia Mar 22, 2025
45c698d
base file actions struct
sebastiantia Mar 25, 2025
b062125
combine visitors
sebastiantia Mar 25, 2025
90e46cd
fmt
sebastiantia Mar 26, 2025
3c25392
remove old code
sebastiantia Mar 26, 2025
cba8ed6
move FileActionKey
sebastiantia Mar 26, 2025
28f1fb4
fix merge
sebastiantia Mar 27, 2025
48f831a
doc
sebastiantia Mar 27, 2025
7c3d976
docs
sebastiantia Mar 28, 2025
b2bb0ce
fix rebase
sebastiantia Mar 28, 2025
0054c71
merge
sebastiantia Mar 28, 2025
abc7e1f
merge fixes
sebastiantia Mar 28, 2025
964f294
docs
sebastiantia Mar 30, 2025
c026258
clean up and docs
sebastiantia Mar 30, 2025
88ba96c
docs
sebastiantia Mar 30, 2025
4c98c84
docs
sebastiantia Mar 30, 2025
c7cd2d1
Merge branch 'extract-deduplication-logic-from-addRemoveDedupVisitor'…
sebastiantia Mar 30, 2025
542166c
merge
sebastiantia Apr 1, 2025
655ed1d
fix merge
sebastiantia Apr 1, 2025
6c222a3
crate mod
sebastiantia Apr 1, 2025
30bd7d6
dev vis
sebastiantia Apr 1, 2025
159b0dd
merge
sebastiantia Apr 1, 2025
5777e5a
improve docs
sebastiantia Apr 1, 2025
5e6695b
Merge branch 'extract-log-replay-processing-structure' into checkpoin…
sebastiantia Apr 1, 2025
bdbc3fb
docs
sebastiantia Apr 1, 2025
6491113
breaking merge
sebastiantia Apr 1, 2025
95d0164
accept metadata & protocol param
sebastiantia Apr 1, 2025
51104aa
Merge branch 'checkpoint-visitors' into checkpoint-replay
sebastiantia Apr 1, 2025
7a59eab
improve docs
sebastiantia Apr 1, 2025
e4bc34e
docs
sebastiantia Apr 1, 2025
d24a80c
refactor into checkpoint mod
sebastiantia Apr 1, 2025
1981ab4
refactor into test_utils
sebastiantia Apr 1, 2025
f084424
rebase on test-utils refactor
sebastiantia Apr 2, 2025
6a28d99
merge
sebastiantia Apr 2, 2025
3488318
merge
sebastiantia Apr 2, 2025
c4e5522
redundant docs
sebastiantia Apr 2, 2025
18d1a29
fix doc
sebastiantia Apr 2, 2025
92b7296
Merge branch 'main' into checkpoint-visitors
sebastiantia Apr 2, 2025
6167cf2
merge
sebastiantia Apr 2, 2025
0d8b3c0
hoist selection vector and data skipping filter
sebastiantia Apr 3, 2025
43760a5
docs
sebastiantia Apr 3, 2025
1137be6
refactorg
sebastiantia Apr 3, 2025
6e3d722
docs
sebastiantia Apr 3, 2025
2252cec
match simplification
sebastiantia Apr 3, 2025
09f3930
docs
sebastiantia Apr 3, 2025
3efeef6
docs and rename
sebastiantia Apr 4, 2025
63f0294
nits and renames
sebastiantia Apr 4, 2025
fab97ba
rename
sebastiantia Apr 4, 2025
f79d9a5
priv mod
sebastiantia Apr 4, 2025
568b59e
docs
sebastiantia Apr 5, 2025
bce9384
clean up docs
sebastiantia Apr 6, 2025
87b17d4
polish docs
sebastiantia Apr 6, 2025
d8df2ea
notes
sebastiantia Apr 6, 2025
7f49ccd
fix indentation
sebastiantia Apr 6, 2025
e809306
merge
sebastiantia Apr 6, 2025
c9f6edd
bool flags
sebastiantia Apr 6, 2025
3f8a69b
Merge branch 'extract-log-replay-processing-structure' into checkpoin…
sebastiantia Apr 6, 2025
e520d1f
remove atomic counters
sebastiantia Apr 6, 2025
f31e51d
box counters
sebastiantia Apr 6, 2025
79d6ff8
review
sebastiantia Apr 7, 2025
a3cf0f2
revert
sebastiantia Apr 7, 2025
4416968
rc<refcell>
sebastiantia Apr 7, 2025
20fe7fe
unignore
sebastiantia Apr 7, 2025
29489d7
fix docs
sebastiantia Apr 7, 2025
ae22a6b
merge
sebastiantia Apr 7, 2025
5ccde93
oops
sebastiantia Apr 7, 2025
00c834b
docs
sebastiantia Apr 7, 2025
3c11320
clean up doc & test
sebastiantia Apr 7, 2025
4f61757
clean up docs
sebastiantia Apr 7, 2025
fdd4f68
update docs
sebastiantia Apr 7, 2025
f3257b4
Merge branch 'main' into checkpoint-visitors
sebastiantia Apr 7, 2025
9c992fc
merge
sebastiantia Apr 7, 2025
2aec9c3
remove mod docs in this PR
sebastiantia Apr 8, 2025
2e2062f
update docs
sebastiantia Apr 8, 2025
c92ea56
Merge branch 'checkpoint-visitors' into checkpoint-replay
sebastiantia Apr 8, 2025
fcb289d
docs
sebastiantia Apr 8, 2025
4d2029e
docs
sebastiantia Apr 9, 2025
e0d81ab
docs
sebastiantia Apr 9, 2025
e9de5bc
arc
sebastiantia Apr 9, 2025
0b609d5
merge
sebastiantia Apr 10, 2025
ab0a373
docs
sebastiantia Apr 10, 2025
9a9697a
test coverage
sebastiantia Apr 10, 2025
c7630a3
doc
sebastiantia Apr 10, 2025
48a0153
review
sebastiantia Apr 11, 2025
4a1a1dd
schema spec
sebastiantia Apr 11, 2025
4d48a8a
pub crate
sebastiantia Apr 11, 2025
411b2c4
forgot to include this file
sebastiantia Apr 11, 2025
48d529d
review
sebastiantia Apr 14, 2025
6a672d8
Merge branch 'main' into checkpoint-replay
sebastiantia Apr 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
308 changes: 289 additions & 19 deletions kernel/src/checkpoint/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,145 @@
//! 1. Creates a visitor with the current deduplication state
//! 2. Applies the visitor to filter actions in the batch
//! 3. Updates counters and state for cross-batch deduplication
//! 4. Produces a [`CheckpointData`] result which includes a selection vector indicating which
//! 4. Produces a [`FilteredEngineData`] result which includes a selection vector indicating which
//! actions should be included in the checkpoint file
use std::collections::HashSet;
use std::sync::LazyLock;

use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
use crate::log_replay::{FileActionDeduplicator, FileActionKey};
use crate::engine_data::{FilteredEngineData, GetData, RowVisitor, TypedGetData as _};
use crate::log_replay::{FileActionDeduplicator, FileActionKey, LogReplayProcessor};
use crate::scan::data_skipping::DataSkippingFilter;
use crate::schema::{column_name, ColumnName, ColumnNamesAndTypes, DataType};
use crate::utils::require;
use crate::{DeltaResult, Error};
use crate::{DeltaResult, EngineData, Error};
use std::collections::HashSet;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, LazyLock};

/// The [`CheckpointLogReplayProcessor`] is an implementation of the [`LogReplayProcessor`]
/// trait that filters log segment actions for inclusion in a V1 spec checkpoint file.
///
/// It processes each action batch via the `process_actions_batch` method, using the
/// [`CheckpointVisitor`] to build an accompanying selection vector indicating which actions
/// should be included in the checkpoint.
pub(crate) struct CheckpointLogReplayProcessor {
/// Tracks file actions that have been seen during log replay to avoid duplicates.
/// Contains (data file path, dv_unique_id) pairs as `FileActionKey` instances.
seen_file_keys: HashSet<FileActionKey>,
// Arc<AtomicI64> provides shared mutability for our counters, allowing both the
// iterator to update the values during processing and the caller to observe the final
// counts afterward. The counters are i64 to match the `_last_checkpoint` file schema.
// Tracks the total number of actions included in the checkpoint file.
total_actions: Arc<AtomicI64>,
// Tracks the total number of add actions included in the checkpoint file.
add_actions_count: Arc<AtomicI64>,
/// Indicates whether a protocol action has been seen in the log.
seen_protocol: bool,
/// Indicates whether a metadata action has been seen in the log.
seen_metadata: bool,
/// Set of transaction app IDs that have been processed to avoid duplicates.
seen_txns: HashSet<String>,
/// Minimum timestamp for file retention, used for filtering expired tombstones.
minimum_file_retention_timestamp: i64,
}

impl LogReplayProcessor for CheckpointLogReplayProcessor {
type Output = FilteredEngineData;

/// Processes a batch of actions read from the log during reverse chronological replay
/// and returns a filtered batch ([`FilteredEngineData`]) to be included in the checkpoint.
///
/// This method delegates the filtering logic to the [`CheckpointVisitor`], which implements
/// the deduplication rules described in the module documentation. The method tracks
/// statistics about processed actions (total count, add actions count) and maintains
/// state for cross-batch deduplication.
fn process_actions_batch(
&mut self,
batch: Box<dyn EngineData>,
is_log_batch: bool,
) -> DeltaResult<Self::Output> {
let selection_vector = vec![true; batch.len()];
assert_eq!(
selection_vector.len(),
batch.len(),
"Initial selection vector length does not match actions length"
);

// Create the checkpoint visitor to process actions and update selection vector
let mut visitor = CheckpointVisitor::new(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to my comment in the other PR: does it work if we keep a checkpoint visitor within the LogReplayProcessor?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed in other thread #738 (comment)

&mut self.seen_file_keys,
is_log_batch,
selection_vector,
self.minimum_file_retention_timestamp,
self.seen_protocol,
self.seen_metadata,
&mut self.seen_txns,
);
visitor.visit_rows_of(batch.as_ref())?;

// Update the total actions and add actions counters. Relaxed ordering is sufficient
// here as we only care about the total count when writing the _last_checkpoint file.
// (the ordering is not important for correctness)
self.total_actions.fetch_add(
visitor.file_actions_count + visitor.non_file_actions_count,
Ordering::Relaxed,
);
self.add_actions_count
.fetch_add(visitor.add_actions_count, Ordering::Relaxed);

// Update protocol and metadata seen flags
self.seen_protocol = visitor.seen_protocol;
self.seen_metadata = visitor.seen_metadata;

Ok(FilteredEngineData {
data: batch,
selection_vector: visitor.selection_vector,
})
}

/// Data skipping is not applicable for checkpoint log replay.
fn data_skipping_filter(&self) -> Option<&DataSkippingFilter> {
None
}
}

impl CheckpointLogReplayProcessor {
pub(crate) fn new(
total_actions: Arc<AtomicI64>,
add_actions_count: Arc<AtomicI64>,
minimum_file_retention_timestamp: i64,
) -> Self {
Self {
seen_file_keys: Default::default(),
total_actions,
add_actions_count,
seen_protocol: false,
seen_metadata: false,
seen_txns: Default::default(),
minimum_file_retention_timestamp,
}
}
}

/// Given an iterator of (engine_data, bool) tuples, returns an iterator of
/// `(engine_data, selection_vec)`. Each row that is selected in the returned `engine_data` _must_
/// be written to the V1 checkpoint file in order to capture the table version's complete state.
/// Non-selected rows _must_ be ignored. The boolean flag tied to each actions batch indicates
/// whether the batch is a commit batch (true) or a checkpoint batch (false).
///
/// Note: The 'action_iter' parameter is an iterator of (engine_data, bool) tuples that _must_ be
/// sorted by the order of the actions in the log from most recent to least recent.
#[allow(unused)] // TODO: Remove once API is implemented
pub(crate) fn checkpoint_actions_iter(
action_iter: impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>>,
total_actions: Arc<AtomicI64>,
add_actions_count: Arc<AtomicI64>,
minimum_file_retention_timestamp: i64,
) -> impl Iterator<Item = DeltaResult<FilteredEngineData>> {
CheckpointLogReplayProcessor::new(
total_actions,
add_actions_count,
minimum_file_retention_timestamp,
)
.process_actions_iter(action_iter)
}

/// A visitor that filters actions for inclusion in a V1 spec checkpoint file.
///
Expand Down Expand Up @@ -137,13 +266,13 @@ impl CheckpointVisitor<'_> {
Self::REMOVE_DV_START_INDEX,
),
selection_vector,
non_file_actions_count: 0,
file_actions_count: 0,
add_actions_count: 0,
minimum_file_retention_timestamp,
seen_protocol,
seen_metadata,
seen_txns,
non_file_actions_count: 0,
}
}

Expand Down Expand Up @@ -355,12 +484,38 @@ impl RowVisitor for CheckpointVisitor<'_> {

#[cfg(test)]
mod tests {
use std::collections::HashSet;

use super::*;
use crate::arrow::array::StringArray;
use crate::utils::test_utils::{action_batch, parse_json_batch};
use itertools::Itertools;
use std::collections::HashSet;

use super::*;
/// Helper function to create test batches from JSON strings
fn create_batch(json_strings: Vec<&str>) -> DeltaResult<(Box<dyn EngineData>, bool)> {
Ok((parse_json_batch(StringArray::from(json_strings)), true))
}

/// Helper function which applies the `checkpoint_actions_iter` function to a set of
/// input batches and returns the results.
fn run_checkpoint_test(
input_batches: Vec<(Box<dyn EngineData>, bool)>,
) -> DeltaResult<(Vec<FilteredEngineData>, i64, i64)> {
let total_actions = Arc::new(AtomicI64::new(0));
let add_actions_count = Arc::new(AtomicI64::new(0));
let results: Vec<_> = checkpoint_actions_iter(
input_batches.into_iter().map(Ok),
total_actions.clone(),
add_actions_count.clone(),
0,
)
.try_collect()?;

Ok((
results,
total_actions.load(Ordering::Relaxed),
add_actions_count.load(Ordering::Relaxed),
))
}

#[test]
fn test_checkpoint_visitor() -> DeltaResult<()> {
Expand Down Expand Up @@ -477,14 +632,14 @@ mod tests {
}

#[test]
fn test_checkpoint_visitor_conflicts_with_deletion_vectors() -> DeltaResult<()> {
fn test_checkpoint_visitor_file_actions_with_deletion_vectors() -> DeltaResult<()> {
let json_strings: StringArray = vec![
// Add action for file1 with deletion vector
r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"two","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
// Remove action for file1 with a different deletion vector
r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"one","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
// Add action for file1 with the same deletion vector as the remove action above (excluded)
r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"one","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"ONE","pathOrInlineDv":"dv1","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
// Remove action for file1 with a different deletion vector
r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"TWO","pathOrInlineDv":"dv2","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
// Remove action for file1 with another different deletion vector
r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"THREE","pathOrInlineDv":"dv3","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
Copy link
Collaborator Author

@sebastiantia sebastiantia Apr 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated this single-batch from a single-commit file test case as it's not a valid scenario:

"The intersection of the primary keys in the add collection and remove collection must be empty."

Instead we test repeating primary keys spanning across multiple commit files below in test_checkpoint_actions_iter_file_actions_with_deletion_vectors

]
.into();
let batch = parse_json_batch(json_strings);
Expand All @@ -503,9 +658,9 @@ mod tests {

visitor.visit_rows_of(batch.as_ref())?;

let expected = vec![true, true, false];
let expected = vec![true, true, true];
assert_eq!(visitor.selection_vector, expected);
assert_eq!(visitor.file_actions_count, 2);
assert_eq!(visitor.file_actions_count, 3);
assert_eq!(visitor.add_actions_count, 1);
assert_eq!(visitor.non_file_actions_count, 0);

Expand Down Expand Up @@ -585,4 +740,119 @@ mod tests {

Ok(())
}

/// This test ensures that the processor correctly deduplicates and filters
/// non-file actions (metadata, protocol, txn) across multiple batches.
#[test]
fn test_checkpoint_actions_iter_non_file_actions() -> DeltaResult<()> {
// Batch 1: protocol, metadata, and txn actions
let batch1 = vec![
r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#,
r#"{"metaData":{"id":"test1","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
r#"{"txn":{"appId":"app1","version":1,"lastUpdated":123456789}}"#,
];

// Batch 2: duplicate actions, and a new txn action
let batch2 = vec![
// Duplicates that should be skipped
r#"{"protocol":{"minReaderVersion":2,"minWriterVersion":3}}"#,
r#"{"metaData":{"id":"test2","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
r#"{"txn":{"appId":"app1","version":1,"lastUpdated":123456789}}"#,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check the latest is returned by making this version different than above?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right below this line is another txn action with a unique appId which covers the scenario you mentioned

// Unique transaction (appId) should be included
r#"{"txn":{"appId":"app2","version":1,"lastUpdated":123456789}}"#,
];

// Batch 3: a duplicate action (entire batch should be skipped)
let batch3 = vec![r#"{"protocol":{"minReaderVersion":2,"minWriterVersion":3}}"#];

let input_batches = vec![
create_batch(batch1)?,
create_batch(batch2)?,
create_batch(batch3)?,
];
let (results, total_actions, add_actions) = run_checkpoint_test(input_batches)?;

// Verify results
assert_eq!(results.len(), 2, "Expected two batches in results");
assert_eq!(results[0].selection_vector, vec![true, true, true],);
assert_eq!(results[1].selection_vector, vec![false, false, false, true],);
assert_eq!(total_actions, 4);
assert_eq!(add_actions, 0);

Ok(())
}

/// This test ensures that the processor correctly deduplicates and filters
/// file actions (add, remove) across multiple batches.
#[test]
fn test_checkpoint_actions_iter_file_actions() -> DeltaResult<()> {
// Batch 1: add action (file1) - new, should be included
let batch1 = vec![
r#"{"add":{"path":"file1","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true}}"#,
];

// Batch 2: remove actions - mixed inclusion
let batch2 = vec![
// Already seen file, should be excluded
r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"partitionValues":{}}}"#,
// New file, should be included
r#"{"remove":{"path":"file2","deletionTimestamp":100,"dataChange":true,"partitionValues":{}}}"#,
];

// Batch 3: add action (file2) - already seen, should be excluded
let batch3 = vec![
r#"{"add":{"path":"file2","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true}}"#,
];

let input_batches = vec![
create_batch(batch1)?,
create_batch(batch2)?,
create_batch(batch3)?,
];
let (results, total_actions, add_actions) = run_checkpoint_test(input_batches)?;

// Verify results
assert_eq!(results.len(), 2); // The third batch should be filtered out since there are no selected actions
assert_eq!(results[0].selection_vector, vec![true]);
assert_eq!(results[1].selection_vector, vec![false, true]);
assert_eq!(total_actions, 2);
assert_eq!(add_actions, 1);

Ok(())
}

/// This test ensures that the processor correctly deduplicates and filters
/// file actions (add, remove) with deletion vectors across multiple batches.
#[test]
fn test_checkpoint_actions_iter_file_actions_with_deletion_vectors() -> DeltaResult<()> {
// Batch 1: add actions with deletion vectors
let batch1 = vec![
// (file1, DV_ONE) New, should be included
r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"ONE","pathOrInlineDv":"dv1","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
// (file1, DV_TWO) New, should be included
r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"TWO","pathOrInlineDv":"dv2","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
];

// Batch 2: mixed actions with duplicate and new entries
let batch2 = vec![
// (file1, DV_ONE): Already seen, should be excluded
r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"ONE","pathOrInlineDv":"dv1","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
// (file1, DV_TWO): Already seen, should be excluded
r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"TWO","pathOrInlineDv":"dv2","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
// New file, should be included
r#"{"remove":{"path":"file2","deletionTimestamp":100,"dataChange":true,"partitionValues":{}}}"#,
];

let input_batches = vec![create_batch(batch1)?, create_batch(batch2)?];
let (results, total_actions, add_actions) = run_checkpoint_test(input_batches)?;

// Verify results
assert_eq!(results.len(), 2);
assert_eq!(results[0].selection_vector, vec![true, true]);
assert_eq!(results[1].selection_vector, vec![false, false, true]);
assert_eq!(total_actions, 3);
assert_eq!(add_actions, 2);

Ok(())
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are tests exhaustive? let's not block this PR but perhaps a follow up if we need to cover:

  • remove tombstones before/after expiration
  • we skip CommitInfo, CDC, CheckpointMetadata, Sidecar

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both of your mentioned scenarios are sufficiently tested in the existing visitor level unit tests:

  1. test_checkpoint_visitor_boundary_cases_for_tombstone_expiration
  2. test_checkpoint_visitor – this test already verifies the exclusion of CommitInfo, cdc, and sidecar actions. (I'll update this test in this PR to also include the checkpointMetadata action in the input batch.)

I'm not sure it's worthwhile to add additional tests specifically for cross-batch behavior, but happy to revisit if you think there's value in doing so, let me know!

}
8 changes: 8 additions & 0 deletions kernel/src/engine_data.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Traits that engines need to implement in order to pass data between themselves and kernel.
use crate::log_replay::HasSelectionVector;
use crate::schema::{ColumnName, DataType};
use crate::{AsAny, DeltaResult, Error};

Expand All @@ -20,6 +21,13 @@ pub struct FilteredEngineData {
pub selection_vector: Vec<bool>,
}

impl HasSelectionVector for FilteredEngineData {
/// Returns true if any row in the selection vector is marked as selected
fn has_selected_rows(&self) -> bool {
self.selection_vector.contains(&true)
}
}

/// a trait that an engine exposes to give access to a list
pub trait EngineList {
/// Return the length of the list at the specified row_index in the raw data
Expand Down
Loading
Loading