Skip to content
Merged
Show file tree
Hide file tree
Changes from 200 commits
Commits
Show all changes
205 commits
Select commit Hold shift + click to select a range
435302e
introduce visitors
sebastiantia Mar 12, 2025
e500a10
remove pub
sebastiantia Mar 12, 2025
19733cd
assert! instead of assert_eq with bool
sebastiantia Mar 12, 2025
87c9f31
log replay for checkpoints
sebastiantia Mar 13, 2025
db5ccd0
rename & some clean up
sebastiantia Mar 13, 2025
42c08c1
remove new path for now
sebastiantia Mar 13, 2025
f91baeb
merge non file action visitor tests
sebastiantia Mar 22, 2025
9fdfba7
mvp for refactor
sebastiantia Mar 24, 2025
d420fd1
these github action checks clog my screen
sebastiantia Mar 24, 2025
9e0e048
base file actions struct
sebastiantia Mar 25, 2025
303444b
combine visitors
sebastiantia Mar 25, 2025
5dbc924
fmt
sebastiantia Mar 26, 2025
b793961
remove old code
sebastiantia Mar 26, 2025
508976f
move FileActionKey
sebastiantia Mar 26, 2025
bccaa17
Merge branch 'main' into checkpoint-visitors
sebastiantia Mar 26, 2025
a23d7cb
merge
sebastiantia Mar 26, 2025
0160ef1
fix whitespace
sebastiantia Mar 26, 2025
aae7046
remove old code
sebastiantia Mar 26, 2025
f574370
refactor more
sebastiantia Mar 26, 2025
a618833
refactor
sebastiantia Mar 26, 2025
7da74b2
more docs
sebastiantia Mar 26, 2025
220a216
invert is_log_batch logic
sebastiantia Mar 26, 2025
9d86911
docs
sebastiantia Mar 26, 2025
e5b0e32
docs
sebastiantia Mar 26, 2025
a5393dc
docs and imports
sebastiantia Mar 26, 2025
a23c651
improve mod doc
sebastiantia Mar 27, 2025
d712d18
improve doc
sebastiantia Mar 27, 2025
e564ae1
docs'
sebastiantia Mar 27, 2025
b14ff19
docs
sebastiantia Mar 27, 2025
a52d484
update
sebastiantia Mar 27, 2025
a243a98
nits
sebastiantia Mar 27, 2025
9f06382
Revert "nits"
sebastiantia Mar 28, 2025
58f38c0
nits
sebastiantia Mar 28, 2025
628546c
refactor
sebastiantia Mar 27, 2025
88cf983
move
sebastiantia Mar 27, 2025
10bb7b5
fix rebase
sebastiantia Mar 28, 2025
4b5a3a9
introduce visitors
sebastiantia Mar 12, 2025
1cb9364
assert! instead of assert_eq with bool
sebastiantia Mar 12, 2025
797a05c
merge non file action visitor tests
sebastiantia Mar 22, 2025
45c698d
base file actions struct
sebastiantia Mar 25, 2025
b062125
combine visitors
sebastiantia Mar 25, 2025
90e46cd
fmt
sebastiantia Mar 26, 2025
3c25392
remove old code
sebastiantia Mar 26, 2025
cba8ed6
move FileActionKey
sebastiantia Mar 26, 2025
28f1fb4
fix merge
sebastiantia Mar 27, 2025
48f831a
doc
sebastiantia Mar 27, 2025
7c3d976
docs
sebastiantia Mar 28, 2025
b2bb0ce
fix rebase
sebastiantia Mar 28, 2025
0054c71
merge
sebastiantia Mar 28, 2025
abc7e1f
merge fixes
sebastiantia Mar 28, 2025
964f294
docs
sebastiantia Mar 30, 2025
c026258
clean up and docs
sebastiantia Mar 30, 2025
88ba96c
docs
sebastiantia Mar 30, 2025
4c98c84
docs
sebastiantia Mar 30, 2025
c7cd2d1
Merge branch 'extract-deduplication-logic-from-addRemoveDedupVisitor'…
sebastiantia Mar 30, 2025
542166c
merge
sebastiantia Apr 1, 2025
655ed1d
fix merge
sebastiantia Apr 1, 2025
6c222a3
crate mod
sebastiantia Apr 1, 2025
30bd7d6
dev vis
sebastiantia Apr 1, 2025
159b0dd
merge
sebastiantia Apr 1, 2025
5777e5a
improve docs
sebastiantia Apr 1, 2025
5e6695b
Merge branch 'extract-log-replay-processing-structure' into checkpoin…
sebastiantia Apr 1, 2025
bdbc3fb
docs
sebastiantia Apr 1, 2025
6491113
breaking merge
sebastiantia Apr 1, 2025
95d0164
accept metadata & protocol param
sebastiantia Apr 1, 2025
51104aa
Merge branch 'checkpoint-visitors' into checkpoint-replay
sebastiantia Apr 1, 2025
7a59eab
improve docs
sebastiantia Apr 1, 2025
e4bc34e
docs
sebastiantia Apr 1, 2025
d24a80c
refactor into checkpoint mod
sebastiantia Apr 1, 2025
1981ab4
refactor into test_utils
sebastiantia Apr 1, 2025
f084424
rebase on test-utils refactor
sebastiantia Apr 2, 2025
6a28d99
merge
sebastiantia Apr 2, 2025
3488318
merge
sebastiantia Apr 2, 2025
c4e5522
redundant docs
sebastiantia Apr 2, 2025
18d1a29
fix doc
sebastiantia Apr 2, 2025
7dccdea
wip
sebastiantia Apr 2, 2025
a9d6c81
include table API
sebastiantia Apr 2, 2025
fffd8f7
fix docs
sebastiantia Apr 2, 2025
92b7296
Merge branch 'main' into checkpoint-visitors
sebastiantia Apr 2, 2025
6167cf2
merge
sebastiantia Apr 2, 2025
0d8b3c0
hoist selection vector and data skipping filter
sebastiantia Apr 3, 2025
43760a5
docs
sebastiantia Apr 3, 2025
1137be6
refactorg
sebastiantia Apr 3, 2025
6e3d722
docs
sebastiantia Apr 3, 2025
2252cec
match simplification
sebastiantia Apr 3, 2025
09f3930
docs
sebastiantia Apr 3, 2025
3efeef6
docs and rename
sebastiantia Apr 4, 2025
63f0294
nits and renames
sebastiantia Apr 4, 2025
fab97ba
rename
sebastiantia Apr 4, 2025
f79d9a5
priv mod
sebastiantia Apr 4, 2025
568b59e
docs
sebastiantia Apr 5, 2025
bce9384
clean up docs
sebastiantia Apr 6, 2025
87b17d4
polish docs
sebastiantia Apr 6, 2025
d8df2ea
notes
sebastiantia Apr 6, 2025
7f49ccd
fix indentation
sebastiantia Apr 6, 2025
e809306
merge
sebastiantia Apr 6, 2025
c9f6edd
bool flags
sebastiantia Apr 6, 2025
3f8a69b
Merge branch 'extract-log-replay-processing-structure' into checkpoin…
sebastiantia Apr 6, 2025
e520d1f
remove atomic counters
sebastiantia Apr 6, 2025
f31e51d
box counters
sebastiantia Apr 6, 2025
79d6ff8
review
sebastiantia Apr 7, 2025
a3cf0f2
revert
sebastiantia Apr 7, 2025
326bea6
move logic to CheckpointWriter
sebastiantia Apr 7, 2025
4416968
rc<refcell>
sebastiantia Apr 7, 2025
7f7ac4e
merge
sebastiantia Apr 7, 2025
4ceaa50
update
sebastiantia Apr 7, 2025
20fe7fe
unignore
sebastiantia Apr 7, 2025
29489d7
fix docs
sebastiantia Apr 7, 2025
ae22a6b
merge
sebastiantia Apr 7, 2025
5ccde93
oops
sebastiantia Apr 7, 2025
00c834b
docs
sebastiantia Apr 7, 2025
3c11320
clean up doc & test
sebastiantia Apr 7, 2025
4f61757
clean up docs
sebastiantia Apr 7, 2025
fdd4f68
update docs
sebastiantia Apr 7, 2025
f3257b4
Merge branch 'main' into checkpoint-visitors
sebastiantia Apr 7, 2025
9c992fc
merge
sebastiantia Apr 7, 2025
09e8199
merge
sebastiantia Apr 7, 2025
72bb446
merge fix
sebastiantia Apr 7, 2025
e2ceee3
docs
sebastiantia Apr 7, 2025
1a5fcb4
remove checkpoint builder
sebastiantia Apr 8, 2025
f177492
docs
sebastiantia Apr 8, 2025
04d418e
docs
sebastiantia Apr 8, 2025
d3a97a7
priv
sebastiantia Apr 8, 2025
1adb100
tests and docs
sebastiantia Apr 8, 2025
9834c6d
fix builds
sebastiantia Apr 8, 2025
2aec9c3
remove mod docs in this PR
sebastiantia Apr 8, 2025
2e2062f
update docs
sebastiantia Apr 8, 2025
aed3ab6
remove file
sebastiantia Apr 8, 2025
c92ea56
Merge branch 'checkpoint-visitors' into checkpoint-replay
sebastiantia Apr 8, 2025
fcb289d
docs
sebastiantia Apr 8, 2025
4d2029e
docs
sebastiantia Apr 9, 2025
e0d81ab
docs
sebastiantia Apr 9, 2025
b4e28ee
review
sebastiantia Apr 9, 2025
544c42a
partial review
sebastiantia Apr 9, 2025
e8d1239
arc atomic
sebastiantia Apr 9, 2025
e9de5bc
arc
sebastiantia Apr 9, 2025
99d31a7
.finalize() with tests
sebastiantia Apr 10, 2025
0b609d5
merge
sebastiantia Apr 10, 2025
ab0a373
docs
sebastiantia Apr 10, 2025
9a9697a
test coverage
sebastiantia Apr 10, 2025
c7630a3
doc
sebastiantia Apr 10, 2025
011ec3f
merge
sebastiantia Apr 11, 2025
c58074b
fix merge
sebastiantia Apr 11, 2025
78fab5f
docs
sebastiantia Apr 11, 2025
64c720d
build & doc fixes
sebastiantia Apr 11, 2025
7c90c33
fmt
sebastiantia Apr 11, 2025
48a0153
review
sebastiantia Apr 11, 2025
4a1a1dd
schema spec
sebastiantia Apr 11, 2025
4d48a8a
pub crate
sebastiantia Apr 11, 2025
411b2c4
forgot to include this file
sebastiantia Apr 11, 2025
48d529d
review
sebastiantia Apr 14, 2025
5f55803
merge & reviews
sebastiantia Apr 14, 2025
b894936
merge
sebastiantia Apr 14, 2025
1bd9658
vis
sebastiantia Apr 14, 2025
2439ef3
compiling doc
sebastiantia Apr 15, 2025
1695392
docs
sebastiantia Apr 15, 2025
cf3faf8
CheckpointWriter error
sebastiantia Apr 15, 2025
aa01189
relaxed ordering
sebastiantia Apr 15, 2025
a9f9614
docs & last_checkpoint
sebastiantia Apr 15, 2025
14b7db7
schemas
sebastiantia Apr 15, 2025
8f985ca
test
sebastiantia Apr 15, 2025
17f1fd9
merge
sebastiantia Apr 15, 2025
9a76b22
fix flag
sebastiantia Apr 15, 2025
ffb02db
write
sebastiantia Apr 15, 2025
31da7d4
extract .finalize api to separate PR
sebastiantia Apr 15, 2025
3621121
remove comments
sebastiantia Apr 15, 2025
600cee6
doc
sebastiantia Apr 15, 2025
2371ed0
include issue
sebastiantia Apr 16, 2025
a04ca66
Merge branch 'main' into checkpoint-builder-and-writer
sebastiantia Apr 16, 2025
db38653
merge fix
sebastiantia Apr 16, 2025
d191912
reviews
sebastiantia Apr 16, 2025
2ece8a3
err handling
sebastiantia Apr 16, 2025
3d2532d
const
sebastiantia Apr 16, 2025
2d5890a
docs
sebastiantia Apr 16, 2025
a69f3f0
issue track
sebastiantia Apr 16, 2025
e84a55b
docs
sebastiantia Apr 17, 2025
457a95c
review
sebastiantia Apr 17, 2025
87aa6a9
merge
sebastiantia Apr 17, 2025
4ccf380
docs
sebastiantia Apr 17, 2025
ca6dbe3
checkpont metadata
sebastiantia Apr 18, 2025
ecaa9b0
channel instead of arc<atomic> for action counts
sebastiantia Apr 18, 2025
f3679e0
docs & fix
sebastiantia Apr 18, 2025
a6c3bf4
review
sebastiantia Apr 19, 2025
6d9c5cb
remove test file & spelling
sebastiantia Apr 21, 2025
affe0b5
reviews
sebastiantia Apr 21, 2025
37e7bea
review - remove channels, iterator as param, FileMeta instead of Engi…
sebastiantia Apr 21, 2025
ff92454
docs
sebastiantia Apr 21, 2025
83827e6
docs
sebastiantia Apr 21, 2025
87b5f1e
fix docs
sebastiantia Apr 21, 2025
b2b07ac
snapshot::checkpoint
sebastiantia Apr 22, 2025
ad8380e
docs
sebastiantia Apr 22, 2025
dbff206
doc test
sebastiantia Apr 22, 2025
7e403f1
doc
sebastiantia Apr 22, 2025
85cb57c
split checkpoint path
sebastiantia Apr 22, 2025
32e6714
reviews
sebastiantia Apr 22, 2025
de3252d
review
sebastiantia Apr 22, 2025
fe11091
review
sebastiantia Apr 22, 2025
ae86b11
docs
sebastiantia Apr 22, 2025
094f460
docs
sebastiantia Apr 22, 2025
893568a
note about not supporting uuid named checkpoints
sebastiantia Apr 22, 2025
d0b6e9b
impl Into
sebastiantia Apr 22, 2025
9e225bd
reviews
sebastiantia Apr 23, 2025
20bd8e6
Merge branch 'main' into checkpoint-builder-and-writer
sebastiantia Apr 28, 2025
be6ba68
fix merge
sebastiantia Apr 28, 2025
3efd39d
Merge branch 'main' into checkpoint-builder-and-writer
sebastiantia Apr 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ffi/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub enum KernelError {
ChangeDataFeedIncompatibleSchema,
InvalidCheckpoint,
LiteralExpressionTransformError,
CheckpointWriteError,
}

impl From<Error> for KernelError {
Expand All @@ -61,6 +62,7 @@ impl From<Error> for KernelError {
// NOTE: By definition, no kernel Error maps to FFIError
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
Error::Arrow(_) => KernelError::ArrowError,
Error::CheckpointWrite(_) => KernelError::CheckpointWriteError,
Error::EngineDataType(_) => KernelError::EngineDataTypeError,
Error::Extract(..) => KernelError::ExtractError,
Error::Generic(_) => KernelError::GenericError,
Expand Down
108 changes: 52 additions & 56 deletions kernel/src/checkpoint/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,42 +25,31 @@
//! the overall process. For each batch of log actions, it:
//! 1. Creates a visitor with the current deduplication state
//! 2. Applies the visitor to filter actions in the batch
//! 3. Updates counters and state for cross-batch deduplication
//! 4. Produces a [`FilteredEngineData`] result which includes a selection vector indicating which
//! actions should be included in the checkpoint file
//! 3. Tracks state for deduplication across batches
//! 4. Produces a [`CheckpointBatch`] result which includes both the filtered data and counts of
//! actions selected for the checkpoint file
//!
//! [`CheckpointMetadata`]: crate::actions::CheckpointMetadata
use crate::engine_data::{FilteredEngineData, GetData, RowVisitor, TypedGetData as _};
use crate::log_replay::{FileActionDeduplicator, FileActionKey, LogReplayProcessor};
use crate::log_replay::{
FileActionDeduplicator, FileActionKey, HasSelectionVector, LogReplayProcessor,
};
use crate::scan::data_skipping::DataSkippingFilter;
use crate::schema::{column_name, ColumnName, ColumnNamesAndTypes, DataType};
use crate::utils::require;
use crate::{DeltaResult, EngineData, Error};

use std::collections::HashSet;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, LazyLock};
use std::sync::LazyLock;

/// The [`CheckpointLogReplayProcessor`] is an implementation of the [`LogReplayProcessor`]
/// trait that filters log segment actions for inclusion in a V1 spec checkpoint file. This
/// processor is leveraged when creating a single-file V2 checkpoint as the V2 spec schema is
/// a superset of the V1 spec schema, with the addition of a [`CheckpointMetadata`] action.
///
/// It processes each action batch via the `process_actions_batch` method, using the
/// [`CheckpointVisitor`] to build an accompanying selection vector indicating which actions
/// should be included in the checkpoint.
#[allow(unused)] // TODO(seb): Remove once checkpoint api is implemented
pub(crate) struct CheckpointLogReplayProcessor {
/// Tracks file actions that have been seen during log replay to avoid duplicates.
/// Contains (data file path, dv_unique_id) pairs as `FileActionKey` instances.
seen_file_keys: HashSet<FileActionKey>,
// Arc<AtomicI64> provides shared mutability for our counters, allowing both the
// iterator to update the values during processing and the caller to observe the final
// counts afterward. The counters are i64 to match the `_last_checkpoint` file schema.
// Tracks the total number of actions included in the checkpoint file.
actions_count: Arc<AtomicI64>,
// Tracks the total number of add actions included in the checkpoint file.
add_actions_count: Arc<AtomicI64>,
/// Indicates whether a protocol action has been seen in the log.
seen_protocol: bool,
/// Indicates whether a metadata action has been seen in the log.
Expand All @@ -71,11 +60,31 @@ pub(crate) struct CheckpointLogReplayProcessor {
minimum_file_retention_timestamp: i64,
}

/// This struct is the output of the [`CheckpointLogReplayProcessor`].
///
/// It contains the filtered batch of actions to be included in the checkpoint,
/// along with statistics about the number of actions filtered for inclusion.
pub(crate) struct CheckpointBatch {
/// The filtered batch of actions to be included in the checkpoint.
pub(crate) filtered_data: FilteredEngineData,
/// The number of actions in the batch filtered for inclusion in the checkpoint.
pub(crate) actions_count: i64,
/// The number of add actions in the batch filtered for inclusion in the checkpoint.
pub(crate) add_actions_count: i64,
}

impl HasSelectionVector for CheckpointBatch {
fn has_selected_rows(&self) -> bool {
self.filtered_data.has_selected_rows()
}
}

impl LogReplayProcessor for CheckpointLogReplayProcessor {
type Output = FilteredEngineData;
type Output = CheckpointBatch;

/// Processes a batch of actions read from the log during reverse chronological replay
/// and returns a filtered batch ([`FilteredEngineData`]) to be included in the checkpoint.
/// and returns a [`CheckpointBatch`], which contains the filtered actions to be
/// included in the checkpoint file, along with statistics about the included actions.
///
/// This method delegates the filtering logic to the [`CheckpointVisitor`], which implements
/// the deduplication rules described in the module documentation. The method tracks
Expand All @@ -100,23 +109,19 @@ impl LogReplayProcessor for CheckpointLogReplayProcessor {
);
visitor.visit_rows_of(batch.as_ref())?;

// Update the total actions and add actions counters. Relaxed ordering is sufficient
// here as we only care about the total count when writing the _last_checkpoint file.
// (the ordering is not important for correctness)
self.actions_count.fetch_add(
visitor.file_actions_count + visitor.non_file_actions_count,
Ordering::Relaxed,
);
self.add_actions_count
.fetch_add(visitor.add_actions_count, Ordering::Relaxed);

// Update protocol and metadata seen flags
self.seen_protocol = visitor.seen_protocol;
self.seen_metadata = visitor.seen_metadata;

Ok(FilteredEngineData {
let filtered_data = FilteredEngineData {
data: batch,
selection_vector: visitor.selection_vector,
};

Ok(CheckpointBatch {
filtered_data,
actions_count: visitor.non_file_actions_count + visitor.file_actions_count,
add_actions_count: visitor.add_actions_count,
})
}

Expand All @@ -127,16 +132,9 @@ impl LogReplayProcessor for CheckpointLogReplayProcessor {
}

impl CheckpointLogReplayProcessor {
#[allow(unused)] // TODO(seb): Remove once checkpoint api is implemented
pub(crate) fn new(
actions_count: Arc<AtomicI64>,
add_actions_count: Arc<AtomicI64>,
minimum_file_retention_timestamp: i64,
) -> Self {
pub(crate) fn new(minimum_file_retention_timestamp: i64) -> Self {
Self {
seen_file_keys: Default::default(),
actions_count,
add_actions_count,
seen_protocol: false,
seen_metadata: false,
seen_txns: Default::default(),
Expand Down Expand Up @@ -463,10 +461,13 @@ impl RowVisitor for CheckpointVisitor<'_> {

#[cfg(test)]
mod tests {
use std::collections::HashSet;

use super::*;
use crate::arrow::array::StringArray;
use crate::utils::test_utils::{action_batch, parse_json_batch};
use std::collections::HashSet;

use itertools::Itertools;

/// Helper function to create test batches from JSON strings
fn create_batch(json_strings: Vec<&str>) -> DeltaResult<(Box<dyn EngineData>, bool)> {
Expand All @@ -478,23 +479,18 @@ mod tests {
fn run_checkpoint_test(
input_batches: Vec<(Box<dyn EngineData>, bool)>,
) -> DeltaResult<(Vec<FilteredEngineData>, i64, i64)> {
let actions_count = Arc::new(AtomicI64::new(0));
let add_actions_count = Arc::new(AtomicI64::new(0));
let results: Vec<_> = CheckpointLogReplayProcessor::new(
actions_count.clone(),
add_actions_count.clone(),
0, // minimum_file_retention_timestamp
)
.process_actions_iter(input_batches.into_iter().map(Ok))
.collect::<DeltaResult<Vec<_>>>()?;

Ok((
results,
actions_count.load(Ordering::Relaxed),
add_actions_count.load(Ordering::Relaxed),
))
let processed_batches: Vec<_> = CheckpointLogReplayProcessor::new(0)
.process_actions_iter(input_batches.into_iter().map(Ok))
.try_collect()?;
let total_count: i64 = processed_batches.iter().map(|b| b.actions_count).sum();
let add_count: i64 = processed_batches.iter().map(|b| b.add_actions_count).sum();
let filtered_data = processed_batches
.into_iter()
.map(|b| b.filtered_data)
.collect();

Ok((filtered_data, total_count, add_count))
}

#[test]
fn test_checkpoint_visitor() -> DeltaResult<()> {
let data = action_batch();
Expand Down
Loading
Loading