-
Notifications
You must be signed in to change notification settings - Fork 139
feat: add CheckpointLogReplayProcessor in new checkpoints mod
#744
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 19 commits
435302e
e500a10
19733cd
87c9f31
db5ccd0
42c08c1
f91baeb
9fdfba7
d420fd1
9e0e048
303444b
5dbc924
b793961
508976f
bccaa17
a23d7cb
0160ef1
aae7046
f574370
a618833
7da74b2
220a216
9d86911
e5b0e32
a5393dc
a23c651
d712d18
e564ae1
b14ff19
a52d484
a243a98
9f06382
58f38c0
628546c
88cf983
10bb7b5
4b5a3a9
1cb9364
797a05c
45c698d
b062125
90e46cd
3c25392
cba8ed6
28f1fb4
48f831a
7c3d976
b2bb0ce
0054c71
abc7e1f
964f294
c026258
88ba96c
4c98c84
c7cd2d1
542166c
655ed1d
6c222a3
30bd7d6
159b0dd
5777e5a
5e6695b
bdbc3fb
6491113
95d0164
51104aa
7a59eab
e4bc34e
d24a80c
1981ab4
f084424
6a28d99
3488318
c4e5522
18d1a29
92b7296
6167cf2
0d8b3c0
43760a5
1137be6
6e3d722
2252cec
09f3930
3efeef6
63f0294
fab97ba
f79d9a5
568b59e
bce9384
87b17d4
d8df2ea
7f49ccd
e809306
c9f6edd
3f8a69b
e520d1f
f31e51d
79d6ff8
a3cf0f2
4416968
20fe7fe
29489d7
ae22a6b
5ccde93
00c834b
3c11320
4f61757
fdd4f68
f3257b4
9c992fc
2aec9c3
2e2062f
c92ea56
fcb289d
4d2029e
e0d81ab
e9de5bc
0b609d5
ab0a373
9a9697a
c7630a3
48a0153
4a1a1dd
4d48a8a
411b2c4
48d529d
6a672d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,248 @@ | ||
| use std::collections::HashSet; | ||
| use std::sync::atomic::{AtomicUsize, Ordering}; | ||
| use std::sync::Arc; | ||
|
|
||
| use crate::actions::visitors::CheckpointVisitor; | ||
| use crate::engine_data::RowVisitor; | ||
| use crate::log_replay::{ | ||
| apply_processor_to_iterator, FileActionKey, HasSelectionVector, LogReplayProcessor, | ||
| }; | ||
| use crate::{DeltaResult, EngineData}; | ||
|
|
||
| pub struct CheckpointData { | ||
| #[allow(unused)] | ||
| data: Box<dyn EngineData>, | ||
| selection_vector: Vec<bool>, | ||
| } | ||
|
|
||
| impl HasSelectionVector for CheckpointData { | ||
| fn has_selected_rows(&self) -> bool { | ||
| self.selection_vector.contains(&true) | ||
| } | ||
| } | ||
|
|
||
| /// `CheckpointLogReplayProcessor` is responsible for filtering actions during log | ||
| /// replay to include only those that should be included in a V1 checkpoint. | ||
| #[allow(unused)] // TODO: Remove once checkpoint_v1 API is implemented | ||
| struct CheckpointLogReplayProcessor { | ||
| /// Tracks file actions that have been seen during log replay to avoid duplicates. | ||
| /// Contains (data file path, dv_unique_id) pairs as `FileActionKey` instances. | ||
| seen_file_keys: HashSet<FileActionKey>, | ||
|
|
||
| /// Counter for the total number of actions processed during log replay. | ||
| total_actions: Arc<AtomicUsize>, | ||
|
|
||
| /// Counter for the total number of add actions processed during log replay. | ||
| total_add_actions: Arc<AtomicUsize>, | ||
sebastiantia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| /// Indicates whether a protocol action has been seen in the log. | ||
| seen_protocol: bool, | ||
|
|
||
| /// Indicates whether a metadata action has been seen in the log. | ||
| seen_metadata: bool, | ||
|
|
||
| /// Set of transaction app IDs that have been processed to avoid duplicates. | ||
| seen_txns: HashSet<String>, | ||
|
|
||
| /// Minimum timestamp for file retention, used for filtering expired tombstones. | ||
| minimum_file_retention_timestamp: i64, | ||
| } | ||
|
|
||
| impl LogReplayProcessor for CheckpointLogReplayProcessor { | ||
| // Define the processing result type as a tuple of the data and selection vector | ||
| type ProcessingResult = CheckpointData; | ||
sebastiantia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| /// This function processes batches of actions in reverse chronological order | ||
| /// (from most recent to least recent) and performs the necessary filtering | ||
| /// to ensure the checkpoint contains only the actions needed to reconstruct | ||
| /// the complete state of the table. | ||
| /// | ||
| /// # Filtering Rules | ||
| /// | ||
| /// The following rules apply when filtering actions: | ||
| /// | ||
| /// 1. Only the most recent protocol and metadata actions are included | ||
| /// 2. For each app ID, only the most recent transaction action is included | ||
| /// 3. File actions are deduplicated based on path and unique ID | ||
| /// 4. Tombstones older than `minimum_file_retention_timestamp` are excluded | ||
| fn process_batch( | ||
sebastiantia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| &mut self, | ||
| batch: Box<dyn EngineData>, | ||
| is_log_batch: bool, | ||
| ) -> DeltaResult<Self::ProcessingResult> { | ||
| // Initialize selection vector with all rows un-selected | ||
| let selection_vector = vec![false; batch.len()]; | ||
| assert_eq!( | ||
| selection_vector.len(), | ||
| batch.len(), | ||
| "Initial selection vector length does not match actions length" | ||
| ); | ||
|
|
||
| // Create the checkpoint visitor to process actions and update selection vector | ||
| let mut visitor = CheckpointVisitor::new( | ||
| &mut self.seen_file_keys, | ||
| selection_vector, | ||
| is_log_batch, | ||
| self.minimum_file_retention_timestamp, | ||
| self.seen_protocol, | ||
| self.seen_metadata, | ||
| &mut self.seen_txns, | ||
| ); | ||
|
|
||
| // Process actions and let visitor update selection vector | ||
| visitor.visit_rows_of(batch.as_ref())?; | ||
|
|
||
| // Update shared counters with file action counts from this batch | ||
| self.total_actions.fetch_add( | ||
| visitor.total_file_actions + visitor.total_non_file_actions, | ||
| Ordering::SeqCst, | ||
| ); | ||
| self.total_add_actions | ||
| .fetch_add(visitor.total_add_actions, Ordering::SeqCst); | ||
|
|
||
| // Update protocol and metadata seen flags | ||
| self.seen_protocol = visitor.seen_protocol; | ||
| self.seen_metadata = visitor.seen_metadata; | ||
|
||
|
|
||
| Ok(CheckpointData { | ||
| data: batch, | ||
| selection_vector: visitor.deduplicator.selection_vector(), | ||
| }) | ||
| } | ||
|
|
||
| // Get a reference to the set of seen file keys | ||
| fn seen_file_keys(&mut self) -> &mut HashSet<FileActionKey> { | ||
| &mut self.seen_file_keys | ||
| } | ||
| } | ||
|
|
||
| #[allow(unused)] // TODO: Remove once checkpoint_v1 API is implemented | ||
| impl CheckpointLogReplayProcessor { | ||
| pub(super) fn new( | ||
| total_actions_counter: Arc<AtomicUsize>, | ||
| total_add_actions_counter: Arc<AtomicUsize>, | ||
| minimum_file_retention_timestamp: i64, | ||
| ) -> Self { | ||
| Self { | ||
| seen_file_keys: Default::default(), | ||
| total_actions: total_actions_counter, | ||
| total_add_actions: total_add_actions_counter, | ||
| seen_protocol: false, | ||
| seen_metadata: false, | ||
| seen_txns: Default::default(), | ||
| minimum_file_retention_timestamp, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Given an iterator of (engine_data, bool) tuples, returns an iterator of | ||
| /// `(engine_data, selection_vec)`. Each row that is selected in the returned `engine_data` _must_ | ||
| /// be written to the V1 checkpoint file in order to capture the table version's complete state. | ||
| /// Non-selected rows _must_ be ignored. The boolean flag indicates whether the record batch | ||
| /// is a log or checkpoint batch. | ||
| /// | ||
| /// Note: The iterator of (engine_data, bool) tuples must be sorted by the order of the actions in | ||
| /// the log from most recent to least recent. | ||
| #[allow(unused)] // TODO: Remove once checkpoint_v1 API is implemented | ||
| pub(crate) fn checkpoint_actions_iter( | ||
| action_iter: impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send + 'static, | ||
| total_actions_counter: Arc<AtomicUsize>, | ||
| total_add_actions_counter: Arc<AtomicUsize>, | ||
| minimum_file_retention_timestamp: i64, | ||
| ) -> impl Iterator<Item = DeltaResult<CheckpointData>> + Send + 'static { | ||
| let mut log_scanner = CheckpointLogReplayProcessor::new( | ||
| total_actions_counter, | ||
| total_add_actions_counter, | ||
| minimum_file_retention_timestamp, | ||
| ); | ||
|
|
||
| apply_processor_to_iterator(log_scanner, action_iter) | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use std::sync::atomic::{AtomicUsize, Ordering}; | ||
| use std::sync::Arc; | ||
|
|
||
| use crate::arrow::array::StringArray; | ||
| use crate::checkpoints::log_replay::checkpoint_actions_iter; | ||
| use crate::utils::test_utils::parse_json_batch; | ||
| use crate::DeltaResult; | ||
|
|
||
| /// Tests the end-to-end processing of multiple batches with various action types. | ||
| /// This tests the integration of the visitors with the main iterator function. | ||
| /// More granular testing is performed in the individual visitor tests. | ||
| #[test] | ||
| fn test_v1_checkpoint_actions_iter_multi_batch_integration() -> DeltaResult<()> { | ||
| // Setup counters | ||
| let total_actions_counter = Arc::new(AtomicUsize::new(0)); | ||
| let total_add_actions_counter = Arc::new(AtomicUsize::new(0)); | ||
|
|
||
| // Create first batch with protocol, metadata, and some files | ||
| let json_strings1: StringArray = vec![ | ||
| r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#, | ||
| r#"{"metaData":{"id":"test2","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, | ||
| r#"{"add":{"path":"file1","partitionValues":{"c1":"4","c2":"c"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#, | ||
| r#"{"add":{"path":"file2","partitionValues":{"c1":"4","c2":"c"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#, | ||
| ].into(); | ||
|
|
||
| // Create second batch with some duplicates and new files | ||
| let json_strings2: StringArray = vec![ | ||
| // Protocol and metadata should be skipped as duplicates | ||
| r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#, | ||
| r#"{"metaData":{"id":"test1","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, | ||
| // New files | ||
| r#"{"add":{"path":"file3","partitionValues":{},"size":800,"modificationTime":102,"dataChange":true}}"#, | ||
| // Duplicate file should be skipped | ||
| r#"{"add":{"path":"file1","partitionValues":{"c1":"4","c2":"c"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#, // Transaction | ||
| r#"{"txn":{"appId":"app1","version":1,"lastUpdated":123456789}}"# | ||
| ].into(); | ||
|
|
||
| // Create third batch with all duplicate actions (should be filtered out completely) | ||
| let json_strings3: StringArray = vec![ | ||
| r#"{"add":{"path":"file1","partitionValues":{"c1":"4","c2":"c"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#, | ||
| r#"{"add":{"path":"file2","partitionValues":{"c1":"4","c2":"c"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#, | ||
| ].into(); | ||
|
|
||
| let input_batches = vec![ | ||
| Ok((parse_json_batch(json_strings1), true)), | ||
| Ok((parse_json_batch(json_strings2), true)), | ||
| Ok((parse_json_batch(json_strings3), true)), | ||
| ]; | ||
|
|
||
| // Run the iterator | ||
| let results: Vec<_> = checkpoint_actions_iter( | ||
| input_batches.into_iter(), | ||
| total_actions_counter.clone(), | ||
| total_add_actions_counter.clone(), | ||
| 0, | ||
| ) | ||
| .collect::<Result<Vec<_>, _>>()?; | ||
|
|
||
| // Expect two batches in results (third batch should be filtered)" | ||
| assert_eq!(results.len(), 2); | ||
|
|
||
| // First batch should have all rows selected | ||
| let checkpoint_data = &results[0]; | ||
| assert_eq!( | ||
| checkpoint_data.selection_vector, | ||
| vec![true, true, true, true] | ||
| ); | ||
|
|
||
| // Second batch should have only new file and transaction selected | ||
| let checkpoint_data = &results[1]; | ||
| assert_eq!( | ||
| checkpoint_data.selection_vector, | ||
| vec![false, false, true, false, true] | ||
| ); | ||
|
|
||
| // Verify counters | ||
| // 6 total actions (4 from batch1 + 2 from batch2 + 0 from batch3) | ||
| assert_eq!(total_actions_counter.load(Ordering::Relaxed), 6); | ||
|
|
||
| // 3 add actions (2 from batch1 + 1 from batch2) | ||
| assert_eq!(total_add_actions_counter.load(Ordering::Relaxed), 3); | ||
|
|
||
| Ok(()) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| pub mod log_replay; |
Uh oh!
There was an error while loading. Please reload this page.