-
Notifications
You must be signed in to change notification settings - Fork 139
feat: add CheckpointLogReplayProcessor in new checkpoints mod
#744
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
435302e
e500a10
19733cd
87c9f31
db5ccd0
42c08c1
f91baeb
9fdfba7
d420fd1
9e0e048
303444b
5dbc924
b793961
508976f
bccaa17
a23d7cb
0160ef1
aae7046
f574370
a618833
7da74b2
220a216
9d86911
e5b0e32
a5393dc
a23c651
d712d18
e564ae1
b14ff19
a52d484
a243a98
9f06382
58f38c0
628546c
88cf983
10bb7b5
4b5a3a9
1cb9364
797a05c
45c698d
b062125
90e46cd
3c25392
cba8ed6
28f1fb4
48f831a
7c3d976
b2bb0ce
0054c71
abc7e1f
964f294
c026258
88ba96c
4c98c84
c7cd2d1
542166c
655ed1d
6c222a3
30bd7d6
159b0dd
5777e5a
5e6695b
bdbc3fb
6491113
95d0164
51104aa
7a59eab
e4bc34e
d24a80c
1981ab4
f084424
6a28d99
3488318
c4e5522
18d1a29
92b7296
6167cf2
0d8b3c0
43760a5
1137be6
6e3d722
2252cec
09f3930
3efeef6
63f0294
fab97ba
f79d9a5
568b59e
bce9384
87b17d4
d8df2ea
7f49ccd
e809306
c9f6edd
3f8a69b
e520d1f
f31e51d
79d6ff8
a3cf0f2
4416968
20fe7fe
29489d7
ae22a6b
5ccde93
00c834b
3c11320
4f61757
fdd4f68
f3257b4
9c992fc
2aec9c3
2e2062f
c92ea56
fcb289d
4d2029e
e0d81ab
e9de5bc
0b609d5
ab0a373
9a9697a
c7630a3
48a0153
4a1a1dd
4d48a8a
411b2c4
48d529d
6a672d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,8 @@ | |
| //! 3. Updates counters and state for cross-batch deduplication | ||
| //! 4. Produces a [`FilteredEngineData`] result which includes a selection vector indicating which | ||
| //! actions should be included in the checkpoint file | ||
| #[cfg(doc)] | ||
| use crate::actions::CheckpointMetadata; | ||
| use crate::engine_data::{FilteredEngineData, GetData, RowVisitor, TypedGetData as _}; | ||
| use crate::log_replay::{FileActionDeduplicator, FileActionKey, LogReplayProcessor}; | ||
| use crate::scan::data_skipping::DataSkippingFilter; | ||
|
|
@@ -39,11 +41,14 @@ use std::sync::atomic::{AtomicI64, Ordering}; | |
| use std::sync::{Arc, LazyLock}; | ||
|
|
||
| /// The [`CheckpointLogReplayProcessor`] is an implementation of the [`LogReplayProcessor`] | ||
| /// trait that filters log segment actions for inclusion in a V1 spec checkpoint file. | ||
| /// trait that filters log segment actions for inclusion in a V1 spec checkpoint file. This | ||
| /// processor is leveraged when creating a single-file V2 checkpoint as the V2 spec schema is | ||
| /// a superset of the V1 spec schema, with the addition of a [`CheckpointMetadata`] action. | ||
| /// | ||
| /// It processes each action batch via the `process_actions_batch` method, using the | ||
| /// [`CheckpointVisitor`] to build an accompanying selection vector indicating which actions | ||
| /// should be included in the checkpoint. | ||
| #[allow(unused)] // TODO(seb): Remove once checkpoint api is implemented | ||
| pub(crate) struct CheckpointLogReplayProcessor { | ||
| /// Tracks file actions that have been seen during log replay to avoid duplicates. | ||
| /// Contains (data file path, dv_unique_id) pairs as `FileActionKey` instances. | ||
|
|
@@ -52,7 +57,7 @@ pub(crate) struct CheckpointLogReplayProcessor { | |
| // iterator to update the values during processing and the caller to observe the final | ||
| // counts afterward. The counters are i64 to match the `_last_checkpoint` file schema. | ||
| // Tracks the total number of actions included in the checkpoint file. | ||
| total_actions: Arc<AtomicI64>, | ||
| actions_count: Arc<AtomicI64>, | ||
| // Tracks the total number of add actions included in the checkpoint file. | ||
| add_actions_count: Arc<AtomicI64>, | ||
| /// Indicates whether a protocol action has been seen in the log. | ||
|
|
@@ -81,11 +86,6 @@ impl LogReplayProcessor for CheckpointLogReplayProcessor { | |
| is_log_batch: bool, | ||
| ) -> DeltaResult<Self::Output> { | ||
| let selection_vector = vec![true; batch.len()]; | ||
| assert_eq!( | ||
| selection_vector.len(), | ||
| batch.len(), | ||
| "Initial selection vector length does not match actions length" | ||
| ); | ||
|
|
||
| // Create the checkpoint visitor to process actions and update selection vector | ||
| let mut visitor = CheckpointVisitor::new( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. related to my comment in the other PR: does it work if we keep a checkpoint visitor within the
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discussed in other thread #738 (comment) |
||
|
|
@@ -102,7 +102,7 @@ impl LogReplayProcessor for CheckpointLogReplayProcessor { | |
| // Update the total actions and add actions counters. Relaxed ordering is sufficient | ||
| // here as we only care about the total count when writing the _last_checkpoint file. | ||
| // (the ordering is not important for correctness) | ||
| self.total_actions.fetch_add( | ||
| self.actions_count.fetch_add( | ||
| visitor.file_actions_count + visitor.non_file_actions_count, | ||
| Ordering::Relaxed, | ||
| ); | ||
|
|
@@ -119,21 +119,22 @@ impl LogReplayProcessor for CheckpointLogReplayProcessor { | |
| }) | ||
| } | ||
|
|
||
| /// Data skipping is not applicable for checkpoint log replay. | ||
| /// We never do data skipping for checkpoint log replay (entire table state is always reproduced) | ||
| fn data_skipping_filter(&self) -> Option<&DataSkippingFilter> { | ||
| None | ||
| } | ||
| } | ||
|
|
||
| impl CheckpointLogReplayProcessor { | ||
| #[allow(unused)] // TODO(seb): Remove once checkpoint api is implemented | ||
| pub(crate) fn new( | ||
| total_actions: Arc<AtomicI64>, | ||
| actions_count: Arc<AtomicI64>, | ||
| add_actions_count: Arc<AtomicI64>, | ||
| minimum_file_retention_timestamp: i64, | ||
| ) -> Self { | ||
| Self { | ||
| seen_file_keys: Default::default(), | ||
| total_actions, | ||
| actions_count, | ||
| add_actions_count, | ||
| seen_protocol: false, | ||
| seen_metadata: false, | ||
|
|
@@ -143,29 +144,6 @@ impl CheckpointLogReplayProcessor { | |
| } | ||
| } | ||
|
|
||
| /// Given an iterator of (engine_data, bool) tuples, returns an iterator of | ||
| /// `(engine_data, selection_vec)`. Each row that is selected in the returned `engine_data` _must_ | ||
| /// be written to the V1 checkpoint file in order to capture the table version's complete state. | ||
| /// Non-selected rows _must_ be ignored. The boolean flag tied to each actions batch indicates | ||
| /// whether the batch is a commit batch (true) or a checkpoint batch (false). | ||
| /// | ||
| /// Note: The 'action_iter' parameter is an iterator of (engine_data, bool) tuples that _must_ be | ||
| /// sorted by the order of the actions in the log from most recent to least recent. | ||
| #[allow(unused)] // TODO: Remove once API is implemented | ||
| pub(crate) fn checkpoint_actions_iter( | ||
| action_iter: impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>>, | ||
| total_actions: Arc<AtomicI64>, | ||
| add_actions_count: Arc<AtomicI64>, | ||
| minimum_file_retention_timestamp: i64, | ||
| ) -> impl Iterator<Item = DeltaResult<FilteredEngineData>> { | ||
| CheckpointLogReplayProcessor::new( | ||
| total_actions, | ||
| add_actions_count, | ||
| minimum_file_retention_timestamp, | ||
| ) | ||
| .process_actions_iter(action_iter) | ||
| } | ||
|
|
||
| /// A visitor that filters actions for inclusion in a V1 spec checkpoint file. | ||
| /// | ||
| /// This visitor processes actions in newest-to-oldest order (as they appear in log | ||
|
|
@@ -487,32 +465,31 @@ mod tests { | |
| use super::*; | ||
| use crate::arrow::array::StringArray; | ||
| use crate::utils::test_utils::{action_batch, parse_json_batch}; | ||
| use itertools::Itertools; | ||
| use std::collections::HashSet; | ||
|
|
||
| /// Helper function to create test batches from JSON strings | ||
| fn create_batch(json_strings: Vec<&str>) -> DeltaResult<(Box<dyn EngineData>, bool)> { | ||
| Ok((parse_json_batch(StringArray::from(json_strings)), true)) | ||
| } | ||
|
|
||
| /// Helper function which applies the `checkpoint_actions_iter` function to a set of | ||
| /// Helper function which applies the [`CheckpointLogReplayProcessor`] to a set of | ||
| /// input batches and returns the results. | ||
| fn run_checkpoint_test( | ||
| input_batches: Vec<(Box<dyn EngineData>, bool)>, | ||
| ) -> DeltaResult<(Vec<FilteredEngineData>, i64, i64)> { | ||
| let total_actions = Arc::new(AtomicI64::new(0)); | ||
| let actions_count = Arc::new(AtomicI64::new(0)); | ||
| let add_actions_count = Arc::new(AtomicI64::new(0)); | ||
| let results: Vec<_> = checkpoint_actions_iter( | ||
| input_batches.into_iter().map(Ok), | ||
| total_actions.clone(), | ||
| let results: Vec<_> = CheckpointLogReplayProcessor::new( | ||
| actions_count.clone(), | ||
| add_actions_count.clone(), | ||
| 0, | ||
| 0, // minimum_file_retention_timestamp | ||
| ) | ||
| .try_collect()?; | ||
| .process_actions_iter(input_batches.into_iter().map(Ok)) | ||
| .collect::<DeltaResult<Vec<_>>>()?; | ||
|
|
||
| Ok(( | ||
| results, | ||
| total_actions.load(Ordering::Relaxed), | ||
| actions_count.load(Ordering::Relaxed), | ||
| add_actions_count.load(Ordering::Relaxed), | ||
| )) | ||
| } | ||
|
|
@@ -770,13 +747,13 @@ mod tests { | |
| create_batch(batch2)?, | ||
| create_batch(batch3)?, | ||
| ]; | ||
| let (results, total_actions, add_actions) = run_checkpoint_test(input_batches)?; | ||
| let (results, actions_count, add_actions) = run_checkpoint_test(input_batches)?; | ||
|
|
||
| // Verify results | ||
| assert_eq!(results.len(), 2, "Expected two batches in results"); | ||
| assert_eq!(results[0].selection_vector, vec![true, true, true],); | ||
| assert_eq!(results[1].selection_vector, vec![false, false, false, true],); | ||
| assert_eq!(total_actions, 4); | ||
| assert_eq!(actions_count, 4); | ||
| assert_eq!(add_actions, 0); | ||
|
|
||
| Ok(()) | ||
|
|
@@ -809,13 +786,13 @@ mod tests { | |
| create_batch(batch2)?, | ||
| create_batch(batch3)?, | ||
| ]; | ||
| let (results, total_actions, add_actions) = run_checkpoint_test(input_batches)?; | ||
| let (results, actions_count, add_actions) = run_checkpoint_test(input_batches)?; | ||
|
|
||
| // Verify results | ||
| assert_eq!(results.len(), 2); // The third batch should be filtered out since there are no selected actions | ||
| assert_eq!(results[0].selection_vector, vec![true]); | ||
| assert_eq!(results[1].selection_vector, vec![false, true]); | ||
| assert_eq!(total_actions, 2); | ||
| assert_eq!(actions_count, 2); | ||
| assert_eq!(add_actions, 1); | ||
|
|
||
| Ok(()) | ||
|
|
@@ -844,13 +821,13 @@ mod tests { | |
| ]; | ||
|
|
||
| let input_batches = vec![create_batch(batch1)?, create_batch(batch2)?]; | ||
| let (results, total_actions, add_actions) = run_checkpoint_test(input_batches)?; | ||
| let (results, actions_count, add_actions) = run_checkpoint_test(input_batches)?; | ||
|
|
||
| // Verify results | ||
| assert_eq!(results.len(), 2); | ||
| assert_eq!(results[0].selection_vector, vec![true, true]); | ||
| assert_eq!(results[1].selection_vector, vec![false, false, true]); | ||
| assert_eq!(total_actions, 3); | ||
| assert_eq!(actions_count, 3); | ||
| assert_eq!(add_actions, 2); | ||
|
|
||
| Ok(()) | ||
|
|
||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These mod docs will be updated with more detail in the following PR: #797. Leaving here for context. |
sebastiantia marked this conversation as resolved.
Show resolved
Hide resolved
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,12 +14,15 @@ | |
| //! deduplication with `FileActionDeduplicator` which tracks unique files across log batches | ||
| //! to minimize memory usage for tables with extensive history. | ||
|
|
||
| use std::collections::HashSet; | ||
|
|
||
| use crate::actions::deletion_vector::DeletionVectorDescriptor; | ||
| #[cfg(doc)] | ||
| use crate::checkpoint::log_replay::CheckpointLogReplayProcessor; | ||
| use crate::engine_data::{GetData, TypedGetData}; | ||
| use crate::scan::data_skipping::DataSkippingFilter; | ||
| #[cfg(doc)] | ||
| use crate::scan::{log_replay::ScanLogReplayProcessor, ScanMetadata}; | ||
sebastiantia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| use crate::{DeltaResult, EngineData}; | ||
| use std::collections::HashSet; | ||
sebastiantia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| use tracing::debug; | ||
|
|
||
|
|
@@ -148,17 +151,15 @@ impl<'seen> FileActionDeduplicator<'seen> { | |
| /// This method examines the data at the given index using the provided getters | ||
| /// to identify whether a file action exists and what type it is. | ||
| /// | ||
| /// # Arguments | ||
| /// | ||
| /// * `i` - Index position in the data structure to examine | ||
| /// * `getters` - Collection of data getter implementations used to access the data | ||
| /// * `skip_removes` - Whether to skip remove actions when extracting file actions | ||
| /// # Parameters | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor formatting changes |
||
| /// - `i`: Index position in the data structure to examine | ||
| /// - `getters`: Collection of data getter implementations used to access the data | ||
| /// - `skip_removes`: Whether to skip remove actions when extracting file actions | ||
| /// | ||
| /// # Returns | ||
| /// | ||
| /// * `Ok(Some((key, is_add)))` - When a file action is found, returns the key and whether it's an add operation | ||
| /// * `Ok(None)` - When no file action is found | ||
| /// * `Err(...)` - On any error during extraction | ||
| /// - `Ok(Some((key, is_add)))`: When a file action is found, returns the key and whether it's an add operation | ||
| /// - `Ok(None)`: When no file action is found | ||
| /// - `Err(...)`: On any error during extraction | ||
| pub(crate) fn extract_file_action<'a>( | ||
| &self, | ||
| i: usize, | ||
|
|
@@ -207,38 +208,52 @@ impl<'seen> FileActionDeduplicator<'seen> { | |
| /// - **Data skipping** filters are applied to the initial selection vector to reduce the number of rows | ||
| /// processed by the processor, (if a filter is provided). | ||
| /// | ||
| /// Implementations: | ||
| /// - `ScanLogReplayProcessor`: Used for table scans, this processor filters and selects deduplicated | ||
| /// # Implementations | ||
| /// | ||
| /// - [`ScanLogReplayProcessor`]: Used for table scans, this processor filters and selects deduplicated | ||
| /// `Add` actions from log batches to reconstruct the view of the table at a specific point in time. | ||
| /// Note that scans do not expose `Remove` actions. Data skipping may be applied when a predicate is | ||
| /// provided. | ||
| /// | ||
| /// - `CheckpointLogReplayProcessor` (WIP): Will be responsible for processing log batches to construct | ||
| /// V1 spec checkpoint files. Unlike scans, checkpoint processing includes additional actions, such as | ||
| /// `Remove`, `Metadata`, and `Protocol`, required to fully reconstruct table state. | ||
| /// Data skipping is not applied during checkpoint processing. | ||
| /// - [`CheckpointLogReplayProcessor`]: Used for writing checkpoints, this processor filters and selects | ||
| /// actions from log batches for inclusion in V1 spec checkpoint files. Unlike scans, checkpoint | ||
| /// processing includes additional actions, such as `Remove`, `Metadata`, and `Protocol`, required to | ||
| /// fully reconstruct table state. Data skipping is not applied during checkpoint processing. | ||
| /// | ||
| /// The `Output` type represents the material result of log replay, and it must implement the | ||
| /// `HasSelectionVector` trait to allow filtering of irrelevant rows: | ||
| /// # Action Iterator Input | ||
| /// | ||
| /// - For **scans**, the output type is `ScanMetadata`, which contains the file actions (`Add` | ||
| /// The [`LogReplayProcessor::process_actions_iter`] method is the entry point for log replay processing. | ||
| /// It takes as input an iterator of (actions batch, is_commit_batch flag) tuples and returns an iterator of | ||
| /// processor-specific output types with selection vectors. The is_commit_batch bool flag in each tuple | ||
| /// indicates whether the batch came from a commit log (`true`) or checkpoint (`false`). Action batches | ||
| /// **must** be sorted by the order of the actions in the log from most recent to oldest. | ||
| /// | ||
| /// Each row that is selected in the returned output **must** be included in the processor's result | ||
| /// (e.g., in scan results or checkpoint files), while non-selected rows **must** be ignored. | ||
| /// | ||
| /// # Output Types | ||
| /// | ||
| /// The [`LogReplayProcessor::Output`] type represents the material result of log replay, and it must | ||
| /// implement the [`HasSelectionVector`] trait to allow filtering of irrelevant rows: | ||
| /// | ||
| /// - For **scans**, the output type is [`ScanMetadata`], which contains the file actions (`Add` | ||
| /// actions) that need to be applied to build the table's view, accompanied by a | ||
| /// **selection vector** that identifies which rows should be included. A transform vector may | ||
| /// also be included to handle schema changes, such as renaming columns or modifying data types. | ||
| /// | ||
| /// - For **checkpoints**, the output includes the actions necessary to write to the checkpoint file (`Add`, | ||
| /// `Remove`, `Metadata`, `Protocol` actions), filtered by the **selection vector** to determine which | ||
| /// rows are included in the final checkpoint. | ||
| /// - For **checkpoints**, the output type is [`FilteredEngineData`], which includes the actions | ||
| /// necessary to write to the checkpoint file (`Add`, `Remove`, `Metadata`, `Protocol` actions), | ||
| /// filtered by the **selection vector** to determine which rows are included in the final checkpoint. | ||
| /// | ||
| /// TODO: Refactor the Change Data Feed (CDF) processor to use this trait. | ||
| pub(crate) trait LogReplayProcessor: Sized { | ||
| /// The type of results produced by this processor must implement the | ||
| /// `HasSelectionVector` trait to allow filtering out batches with no selected rows. | ||
| /// [`HasSelectionVector`] trait to allow filtering out batches with no selected rows. | ||
| type Output: HasSelectionVector; | ||
|
|
||
| /// Processes a batch of actions and returns the filtered results. | ||
| /// | ||
| /// # Arguments | ||
| /// # Parameters | ||
| /// - `actions_batch` - A boxed [`EngineData`] instance representing a batch of actions. | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
|
||
| /// - `is_log_batch` - `true` if the batch originates from a commit log, `false` if from a checkpoint. | ||
| /// | ||
|
|
@@ -253,10 +268,19 @@ pub(crate) trait LogReplayProcessor: Sized { | |
|
|
||
| /// Applies the processor to an actions iterator and filters out empty results. | ||
| /// | ||
| /// # Arguments | ||
| /// * `action_iter` - Iterator of action batches and their source flags | ||
| /// This method: | ||
| /// 1. Applies `process_actions_batch` to each action batch | ||
| /// 2. Maintains processor state across all batches | ||
| /// 3. Automatically filters out batches with no selected rows | ||
| /// | ||
| /// # Parameters | ||
| /// - `action_iter`: Iterator of (batch, is_commit_batch) tuples, where each batch contains actions | ||
| /// and the boolean flag indicates whether the batch came from a commit log (`true`) or checkpoint | ||
| /// (`false`). Actions must be provided in reverse chronological order. | ||
| /// | ||
| /// Returns an iterator that yields the Output type of the processor. | ||
| /// # Returns | ||
| /// An iterator that yields the output type of the processor, containing only non-empty results | ||
| /// (batches where at least one row was selected). | ||
| fn process_actions_iter( | ||
| mut self, | ||
| action_iter: impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>>, | ||
|
|
@@ -281,8 +305,8 @@ pub(crate) trait LogReplayProcessor: Sized { | |
| /// The selection vector is further updated based on the processor's logic in the | ||
| /// `process_actions_batch` method. | ||
| /// | ||
| /// # Arguments | ||
| /// - `batch` - A reference to the batch of actions to be processed. | ||
| /// # Parameters | ||
| /// - `batch`: A reference to the batch of actions to be processed. | ||
| /// | ||
| /// # Returns | ||
| /// A `DeltaResult<Vec<bool>>`, where each boolean indicates if the corresponding row should be included. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.