-
Notifications
You must be signed in to change notification settings - Fork 139
feat: add CheckpointLogReplayProcessor in new checkpoints mod
#744
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+348
−68
Merged
Changes from 8 commits
Commits
Show all changes
127 commits
Select commit
Hold shift + click to select a range
435302e
introduce visitors
sebastiantia e500a10
remove pub
sebastiantia 19733cd
assert! instead of assert_eq with bool
sebastiantia 87c9f31
log replay for checkpoints
sebastiantia db5ccd0
rename & some clean up
sebastiantia 42c08c1
remove new path for now
sebastiantia f91baeb
merge non file action visitor tests
sebastiantia 9fdfba7
mvp for refactor
sebastiantia d420fd1
these github action checks clog my screen
sebastiantia 9e0e048
base file actions struct
sebastiantia 303444b
combine visitors
sebastiantia 5dbc924
fmt
sebastiantia b793961
remove old code
sebastiantia 508976f
move FileActionKey
sebastiantia bccaa17
Merge branch 'main' into checkpoint-visitors
sebastiantia a23d7cb
merge
sebastiantia 0160ef1
fix whitespace
sebastiantia aae7046
remove old code
sebastiantia f574370
refactor more
sebastiantia a618833
refactor
sebastiantia 7da74b2
more docs
sebastiantia 220a216
invert is_log_batch logic
sebastiantia 9d86911
docs
sebastiantia e5b0e32
docs
sebastiantia a5393dc
docs and imports
sebastiantia a23c651
improve mod doc
sebastiantia d712d18
improve doc
sebastiantia e564ae1
docs'
sebastiantia b14ff19
docs
sebastiantia a52d484
update
sebastiantia a243a98
nits
sebastiantia 9f06382
Revert "nits"
sebastiantia 58f38c0
nits
sebastiantia 628546c
refactor
sebastiantia 88cf983
move
sebastiantia 10bb7b5
fix rebase
sebastiantia 4b5a3a9
introduce visitors
sebastiantia 1cb9364
assert! instead of assert_eq with bool
sebastiantia 797a05c
merge non file action visitor tests
sebastiantia 45c698d
base file actions struct
sebastiantia b062125
combine visitors
sebastiantia 90e46cd
fmt
sebastiantia 3c25392
remove old code
sebastiantia cba8ed6
move FileActionKey
sebastiantia 28f1fb4
fix merge
sebastiantia 48f831a
doc
sebastiantia 7c3d976
docs
sebastiantia b2bb0ce
fix rebase
sebastiantia 0054c71
merge
sebastiantia abc7e1f
merge fixes
sebastiantia 964f294
docs
sebastiantia c026258
clean up and docs
sebastiantia 88ba96c
docs
sebastiantia 4c98c84
docs
sebastiantia c7cd2d1
Merge branch 'extract-deduplication-logic-from-addRemoveDedupVisitor'…
sebastiantia 542166c
merge
sebastiantia 655ed1d
fix merge
sebastiantia 6c222a3
crate mod
sebastiantia 30bd7d6
dev vis
sebastiantia 159b0dd
merge
sebastiantia 5777e5a
improve docs
sebastiantia 5e6695b
Merge branch 'extract-log-replay-processing-structure' into checkpoin…
sebastiantia bdbc3fb
docs
sebastiantia 6491113
breaking merge
sebastiantia 95d0164
accept metadata & protocol param
sebastiantia 51104aa
Merge branch 'checkpoint-visitors' into checkpoint-replay
sebastiantia 7a59eab
improve docs
sebastiantia e4bc34e
docs
sebastiantia d24a80c
refactor into checkpoint mod
sebastiantia 1981ab4
refactor into test_utils
sebastiantia f084424
rebase on test-utils refactor
sebastiantia 6a28d99
merge
sebastiantia 3488318
merge
sebastiantia c4e5522
redundant docs
sebastiantia 18d1a29
fix doc
sebastiantia 92b7296
Merge branch 'main' into checkpoint-visitors
sebastiantia 6167cf2
merge
sebastiantia 0d8b3c0
hoist selection vector and data skipping filter
sebastiantia 43760a5
docs
sebastiantia 1137be6
refactorg
sebastiantia 6e3d722
docs
sebastiantia 2252cec
match simplification
sebastiantia 09f3930
docs
sebastiantia 3efeef6
docs and rename
sebastiantia 63f0294
nits and renames
sebastiantia fab97ba
rename
sebastiantia f79d9a5
priv mod
sebastiantia 568b59e
docs
sebastiantia bce9384
clean up docs
sebastiantia 87b17d4
polish docs
sebastiantia d8df2ea
notes
sebastiantia 7f49ccd
fix indentation
sebastiantia e809306
merge
sebastiantia c9f6edd
bool flags
sebastiantia 3f8a69b
Merge branch 'extract-log-replay-processing-structure' into checkpoin…
sebastiantia e520d1f
remove atomic counters
sebastiantia f31e51d
box counters
sebastiantia 79d6ff8
review
sebastiantia a3cf0f2
revert
sebastiantia 4416968
rc<refcell>
sebastiantia 20fe7fe
unignore
sebastiantia 29489d7
fix docs
sebastiantia ae22a6b
merge
sebastiantia 5ccde93
oops
sebastiantia 00c834b
docs
sebastiantia 3c11320
clean up doc & test
sebastiantia 4f61757
clean up docs
sebastiantia fdd4f68
update docs
sebastiantia f3257b4
Merge branch 'main' into checkpoint-visitors
sebastiantia 9c992fc
merge
sebastiantia 2aec9c3
remove mod docs in this PR
sebastiantia 2e2062f
update docs
sebastiantia c92ea56
Merge branch 'checkpoint-visitors' into checkpoint-replay
sebastiantia fcb289d
docs
sebastiantia 4d2029e
docs
sebastiantia e0d81ab
docs
sebastiantia e9de5bc
arc
sebastiantia 0b609d5
merge
sebastiantia ab0a373
docs
sebastiantia 9a9697a
test coverage
sebastiantia c7630a3
doc
sebastiantia 48a0153
review
sebastiantia 4a1a1dd
schema spec
sebastiantia 4d48a8a
pub crate
sebastiantia 411b2c4
forgot to include this file
sebastiantia 48d529d
review
sebastiantia 6a672d8
Merge branch 'main' into checkpoint-replay
sebastiantia File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,240 @@ | ||
| use std::collections::HashSet; | ||
| use std::sync::atomic::{AtomicUsize, Ordering}; | ||
| use std::sync::Arc; | ||
|
|
||
| use crate::actions::visitors::{CheckpointFileActionsVisitor, CheckpointNonFileActionsVisitor}; | ||
| use crate::engine_data::RowVisitor; | ||
| use crate::log_replay::{FileActionKey, LogReplayProcessor}; | ||
| use crate::{DeltaResult, EngineData}; | ||
|
|
||
| /// `CheckpointLogReplayProcessor` is responsible for filtering actions during log | ||
| /// replay to include only those that should be included in a V1 checkpoint. | ||
| #[allow(unused)] // TODO: Remove once checkpoint_v1 API is implemented | ||
| struct CheckpointLogReplayProcessor { | ||
| /// Tracks file actions that have been seen during log replay to avoid duplicates. | ||
| /// Contains (data file path, dv_unique_id) pairs as `FileActionKey` instances. | ||
| seen_file_keys: HashSet<FileActionKey>, | ||
|
|
||
| /// Counter for the total number of actions processed during log replay. | ||
| total_actions: Arc<AtomicUsize>, | ||
|
|
||
| /// Counter for the total number of add actions processed during log replay. | ||
| total_add_actions: Arc<AtomicUsize>, | ||
|
|
||
| /// Indicates whether a protocol action has been seen in the log. | ||
| seen_protocol: bool, | ||
|
|
||
| /// Indicates whether a metadata action has been seen in the log. | ||
| seen_metadata: bool, | ||
|
|
||
| /// Set of transaction app IDs that have been processed to avoid duplicates. | ||
| seen_txns: HashSet<String>, | ||
|
|
||
| /// Minimum timestamp for file retention, used for filtering expired tombstones. | ||
| minimum_file_retention_timestamp: i64, | ||
| } | ||
|
|
||
| impl LogReplayProcessor for CheckpointLogReplayProcessor { | ||
| // Define the processing result type as a tuple of the data and selection vector | ||
| type ProcessingResult = (Box<dyn EngineData>, Vec<bool>); | ||
sebastiantia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| /// This function processes batches of actions in reverse chronological order | ||
| /// (from most recent to least recent) and performs the necessary filtering | ||
| /// to ensure the checkpoint contains only the actions needed to reconstruct | ||
| /// the complete state of the table. | ||
| /// | ||
| /// # Filtering Rules | ||
| /// | ||
| /// The following rules apply when filtering actions: | ||
| /// | ||
| /// 1. Only the most recent protocol and metadata actions are included | ||
| /// 2. For each app ID, only the most recent transaction action is included | ||
| /// 3. File actions are deduplicated based on path and unique ID | ||
| /// 4. Tombstones older than `minimum_file_retention_timestamp` are excluded | ||
| fn process_batch( | ||
sebastiantia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| &mut self, | ||
| batch: Box<dyn EngineData>, | ||
| is_log_batch: bool, | ||
| ) -> DeltaResult<Self::ProcessingResult> { | ||
| // Initialize selection vector with all rows un-selected | ||
| let mut selection_vector = vec![false; batch.len()]; | ||
| assert_eq!( | ||
| selection_vector.len(), | ||
| batch.len(), | ||
| "Initial selection vector length does not match actions length" | ||
| ); | ||
|
|
||
| // Create the non file actions visitor to process non file actions and update selection vector | ||
| let mut non_file_actions_visitor = CheckpointNonFileActionsVisitor { | ||
| seen_protocol: &mut self.seen_protocol, | ||
| seen_metadata: &mut self.seen_metadata, | ||
| seen_txns: &mut self.seen_txns, | ||
| selection_vector: &mut selection_vector, | ||
| total_actions: 0, | ||
| }; | ||
|
|
||
| // Process actions and let visitor update selection vector | ||
| non_file_actions_visitor.visit_rows_of(batch.as_ref())?; | ||
|
|
||
| // Update shared counters with non-file action counts from this batch | ||
| self.total_actions | ||
| .fetch_add(non_file_actions_visitor.total_actions, Ordering::Relaxed); | ||
|
|
||
| // Create the file actions visitor to process file actions and update selection vector | ||
| let mut file_actions_visitor = CheckpointFileActionsVisitor { | ||
| seen_file_keys: &mut self.seen_file_keys, | ||
| is_log_batch, | ||
| selection_vector: &mut selection_vector, | ||
| total_actions: 0, | ||
| total_add_actions: 0, | ||
| minimum_file_retention_timestamp: self.minimum_file_retention_timestamp, | ||
| }; | ||
|
|
||
| // Process actions and let visitor update selection vector | ||
| file_actions_visitor.visit_rows_of(batch.as_ref())?; | ||
|
|
||
| // Update shared counters with file action counts from this batch | ||
| self.total_actions | ||
| .fetch_add(file_actions_visitor.total_actions, Ordering::Relaxed); | ||
| self.total_add_actions | ||
| .fetch_add(file_actions_visitor.total_add_actions, Ordering::Relaxed); | ||
|
|
||
| Ok((batch, selection_vector)) | ||
| } | ||
|
|
||
| // Get a reference to the set of seen file keys | ||
| fn seen_file_keys(&mut self) -> &mut HashSet<FileActionKey> { | ||
| &mut self.seen_file_keys | ||
| } | ||
| } | ||
|
|
||
| #[allow(unused)] // TODO: Remove once checkpoint_v1 API is implemented | ||
| impl CheckpointLogReplayProcessor { | ||
| pub(super) fn new( | ||
| total_actions_counter: Arc<AtomicUsize>, | ||
| total_add_actions_counter: Arc<AtomicUsize>, | ||
| minimum_file_retention_timestamp: i64, | ||
| ) -> Self { | ||
| Self { | ||
| seen_file_keys: Default::default(), | ||
| total_actions: total_actions_counter, | ||
| total_add_actions: total_add_actions_counter, | ||
| seen_protocol: false, | ||
| seen_metadata: false, | ||
| seen_txns: Default::default(), | ||
| minimum_file_retention_timestamp, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Given an iterator of (engine_data, bool) tuples, returns an iterator of | ||
| /// `(engine_data, selection_vec)`. Each row that is selected in the returned `engine_data` _must_ | ||
| /// be written to the V1 checkpoint file in order to capture the table version's complete state. | ||
| /// Non-selected rows _must_ be ignored. The boolean flag indicates whether the record batch | ||
| /// is a log or checkpoint batch. | ||
| /// | ||
| /// Note: The iterator of (engine_data, bool) tuples must be sorted by the order of the actions in | ||
| /// the log from most recent to least recent. | ||
| #[allow(unused)] // TODO: Remove once checkpoint_v1 API is implemented | ||
| pub(crate) fn checkpoint_actions_iter( | ||
| action_iter: impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send + 'static, | ||
| total_actions_counter: Arc<AtomicUsize>, | ||
| total_add_actions_counter: Arc<AtomicUsize>, | ||
| minimum_file_retention_timestamp: i64, | ||
| ) -> impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, Vec<bool>)>> + Send + 'static { | ||
| let mut log_scanner = CheckpointLogReplayProcessor::new( | ||
| total_actions_counter, | ||
| total_add_actions_counter, | ||
| minimum_file_retention_timestamp, | ||
| ); | ||
|
|
||
| action_iter | ||
| .map(move |action_res| { | ||
| let (batch, is_log_batch) = action_res?; | ||
| log_scanner.process_batch(batch, is_log_batch) | ||
| }) | ||
| // Only yield batches that have at least one selected row | ||
| .filter(|res| res.as_ref().map_or(true, |(_, sv)| sv.contains(&true))) | ||
sebastiantia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use std::sync::atomic::{AtomicUsize, Ordering}; | ||
| use std::sync::Arc; | ||
|
|
||
| use crate::arrow::array::StringArray; | ||
| use crate::checkpoints::log_replay::checkpoint_actions_iter; | ||
| use crate::utils::test_utils::parse_json_batch; | ||
| use crate::DeltaResult; | ||
|
|
||
| /// Tests the end-to-end processing of multiple batches with various action types. | ||
| /// This tests the integration of the visitors with the main iterator function. | ||
| /// More granular testing is performed in the individual visitor tests. | ||
| #[test] | ||
| fn test_v1_checkpoint_actions_iter_multi_batch_integration() -> DeltaResult<()> { | ||
| // Setup counters | ||
| let total_actions_counter = Arc::new(AtomicUsize::new(0)); | ||
| let total_add_actions_counter = Arc::new(AtomicUsize::new(0)); | ||
|
|
||
| // Create first batch with protocol, metadata, and some files | ||
| let json_strings1: StringArray = vec![ | ||
| r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#, | ||
| r#"{"metaData":{"id":"test2","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, | ||
| r#"{"add":{"path":"file1","partitionValues":{"c1":"4","c2":"c"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#, | ||
| r#"{"add":{"path":"file2","partitionValues":{"c1":"4","c2":"c"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#, | ||
| ].into(); | ||
|
|
||
| // Create second batch with some duplicates and new files | ||
| let json_strings2: StringArray = vec![ | ||
| // Protocol and metadata should be skipped as duplicates | ||
| r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#, | ||
| r#"{"metaData":{"id":"test1","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, | ||
| // New files | ||
| r#"{"add":{"path":"file3","partitionValues":{},"size":800,"modificationTime":102,"dataChange":true}}"#, | ||
| // Duplicate file should be skipped | ||
| r#"{"add":{"path":"file1","partitionValues":{"c1":"4","c2":"c"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#, // Transaction | ||
| r#"{"txn":{"appId":"app1","version":1,"lastUpdated":123456789}}"# | ||
| ].into(); | ||
|
|
||
| // Create third batch with all duplicate actions (should be filtered out completely) | ||
| let json_strings3: StringArray = vec![ | ||
| r#"{"add":{"path":"file1","partitionValues":{"c1":"4","c2":"c"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#, | ||
| r#"{"add":{"path":"file2","partitionValues":{"c1":"4","c2":"c"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#, | ||
| ].into(); | ||
|
|
||
| let input_batches = vec![ | ||
| Ok((parse_json_batch(json_strings1), true)), | ||
| Ok((parse_json_batch(json_strings2), true)), | ||
| Ok((parse_json_batch(json_strings3), true)), | ||
| ]; | ||
|
|
||
| // Run the iterator | ||
| let results: Vec<_> = checkpoint_actions_iter( | ||
| input_batches.into_iter(), | ||
| total_actions_counter.clone(), | ||
| total_add_actions_counter.clone(), | ||
| 0, | ||
| ) | ||
| .collect::<Result<Vec<_>, _>>()?; | ||
|
|
||
| // Expect two batches in results (third batch should be filtered)" | ||
| assert_eq!(results.len(), 2); | ||
|
|
||
| // First batch should have all rows selected | ||
| let (_, selection_vector1) = &results[0]; | ||
| assert_eq!(selection_vector1, &vec![true, true, true, true]); | ||
|
|
||
| // Second batch should have only new file and transaction selected | ||
| let (_, selection_vector2) = &results[1]; | ||
| assert_eq!(selection_vector2, &vec![false, false, true, false, true]); | ||
|
|
||
| // Verify counters | ||
| // 6 total actions (4 from batch1 + 2 from batch2 + 0 from batch3) | ||
| assert_eq!(total_actions_counter.load(Ordering::Relaxed), 6); | ||
|
|
||
| // 3 add actions (2 from batch1 + 1 from batch2) | ||
| assert_eq!(total_add_actions_counter.load(Ordering::Relaxed), 3); | ||
|
|
||
| Ok(()) | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| pub mod log_replay; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.