diff --git a/kernel/src/checkpoint/log_replay.rs b/kernel/src/checkpoint/log_replay.rs index 7867e042e1..e80d8ce56d 100644 --- a/kernel/src/checkpoint/log_replay.rs +++ b/kernel/src/checkpoint/log_replay.rs @@ -26,16 +26,124 @@ //! 1. Creates a visitor with the current deduplication state //! 2. Applies the visitor to filter actions in the batch //! 3. Updates counters and state for cross-batch deduplication -//! 4. Produces a [`CheckpointData`] result which includes a selection vector indicating which +//! 4. Produces a [`FilteredEngineData`] result which includes a selection vector indicating which //! actions should be included in the checkpoint file -use std::collections::HashSet; -use std::sync::LazyLock; - -use crate::engine_data::{GetData, RowVisitor, TypedGetData as _}; -use crate::log_replay::{FileActionDeduplicator, FileActionKey}; +//! +//! [`CheckpointMetadata`]: crate::actions::CheckpointMetadata +use crate::engine_data::{FilteredEngineData, GetData, RowVisitor, TypedGetData as _}; +use crate::log_replay::{FileActionDeduplicator, FileActionKey, LogReplayProcessor}; +use crate::scan::data_skipping::DataSkippingFilter; use crate::schema::{column_name, ColumnName, ColumnNamesAndTypes, DataType}; use crate::utils::require; -use crate::{DeltaResult, Error}; +use crate::{DeltaResult, EngineData, Error}; + +use std::collections::HashSet; +use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::{Arc, LazyLock}; + +/// The [`CheckpointLogReplayProcessor`] is an implementation of the [`LogReplayProcessor`] +/// trait that filters log segment actions for inclusion in a V1 spec checkpoint file. This +/// processor is leveraged when creating a single-file V2 checkpoint as the V2 spec schema is +/// a superset of the V1 spec schema, with the addition of a [`CheckpointMetadata`] action. +/// +/// It processes each action batch via the `process_actions_batch` method, using the +/// [`CheckpointVisitor`] to build an accompanying selection vector indicating which actions +/// should be included in the checkpoint. +#[allow(unused)] // TODO(seb): Remove once checkpoint api is implemented +pub(crate) struct CheckpointLogReplayProcessor { + /// Tracks file actions that have been seen during log replay to avoid duplicates. + /// Contains (data file path, dv_unique_id) pairs as `FileActionKey` instances. + seen_file_keys: HashSet, + // Arc provides shared mutability for our counters, allowing both the + // iterator to update the values during processing and the caller to observe the final + // counts afterward. The counters are i64 to match the `_last_checkpoint` file schema. + // Tracks the total number of actions included in the checkpoint file. + actions_count: Arc, + // Tracks the total number of add actions included in the checkpoint file. + add_actions_count: Arc, + /// Indicates whether a protocol action has been seen in the log. + seen_protocol: bool, + /// Indicates whether a metadata action has been seen in the log. + seen_metadata: bool, + /// Set of transaction app IDs that have been processed to avoid duplicates. + seen_txns: HashSet, + /// Minimum timestamp for file retention, used for filtering expired tombstones. + minimum_file_retention_timestamp: i64, +} + +impl LogReplayProcessor for CheckpointLogReplayProcessor { + type Output = FilteredEngineData; + + /// Processes a batch of actions read from the log during reverse chronological replay + /// and returns a filtered batch ([`FilteredEngineData`]) to be included in the checkpoint. + /// + /// This method delegates the filtering logic to the [`CheckpointVisitor`], which implements + /// the deduplication rules described in the module documentation. The method tracks + /// statistics about processed actions (total count, add actions count) and maintains + /// state for cross-batch deduplication. + fn process_actions_batch( + &mut self, + batch: Box, + is_log_batch: bool, + ) -> DeltaResult { + let selection_vector = vec![true; batch.len()]; + + // Create the checkpoint visitor to process actions and update selection vector + let mut visitor = CheckpointVisitor::new( + &mut self.seen_file_keys, + is_log_batch, + selection_vector, + self.minimum_file_retention_timestamp, + self.seen_protocol, + self.seen_metadata, + &mut self.seen_txns, + ); + visitor.visit_rows_of(batch.as_ref())?; + + // Update the total actions and add actions counters. Relaxed ordering is sufficient + // here as we only care about the total count when writing the _last_checkpoint file. + // (the ordering is not important for correctness) + self.actions_count.fetch_add( + visitor.file_actions_count + visitor.non_file_actions_count, + Ordering::Relaxed, + ); + self.add_actions_count + .fetch_add(visitor.add_actions_count, Ordering::Relaxed); + + // Update protocol and metadata seen flags + self.seen_protocol = visitor.seen_protocol; + self.seen_metadata = visitor.seen_metadata; + + Ok(FilteredEngineData { + data: batch, + selection_vector: visitor.selection_vector, + }) + } + + /// We never do data skipping for checkpoint log replay (entire table state is always reproduced) + fn data_skipping_filter(&self) -> Option<&DataSkippingFilter> { + None + } +} + +impl CheckpointLogReplayProcessor { + #[allow(unused)] // TODO(seb): Remove once checkpoint api is implemented + pub(crate) fn new( + actions_count: Arc, + add_actions_count: Arc, + minimum_file_retention_timestamp: i64, + ) -> Self { + Self { + seen_file_keys: Default::default(), + actions_count, + add_actions_count, + seen_protocol: false, + seen_metadata: false, + seen_txns: Default::default(), + minimum_file_retention_timestamp, + } + } +} /// A visitor that filters actions for inclusion in a V1 spec checkpoint file. /// @@ -137,13 +245,13 @@ impl CheckpointVisitor<'_> { Self::REMOVE_DV_START_INDEX, ), selection_vector, + non_file_actions_count: 0, file_actions_count: 0, add_actions_count: 0, minimum_file_retention_timestamp, seen_protocol, seen_metadata, seen_txns, - non_file_actions_count: 0, } } @@ -355,12 +463,37 @@ impl RowVisitor for CheckpointVisitor<'_> { #[cfg(test)] mod tests { - use std::collections::HashSet; - + use super::*; use crate::arrow::array::StringArray; use crate::utils::test_utils::{action_batch, parse_json_batch}; + use std::collections::HashSet; - use super::*; + /// Helper function to create test batches from JSON strings + fn create_batch(json_strings: Vec<&str>) -> DeltaResult<(Box, bool)> { + Ok((parse_json_batch(StringArray::from(json_strings)), true)) + } + + /// Helper function which applies the [`CheckpointLogReplayProcessor`] to a set of + /// input batches and returns the results. + fn run_checkpoint_test( + input_batches: Vec<(Box, bool)>, + ) -> DeltaResult<(Vec, i64, i64)> { + let actions_count = Arc::new(AtomicI64::new(0)); + let add_actions_count = Arc::new(AtomicI64::new(0)); + let results: Vec<_> = CheckpointLogReplayProcessor::new( + actions_count.clone(), + add_actions_count.clone(), + 0, // minimum_file_retention_timestamp + ) + .process_actions_iter(input_batches.into_iter().map(Ok)) + .collect::>>()?; + + Ok(( + results, + actions_count.load(Ordering::Relaxed), + add_actions_count.load(Ordering::Relaxed), + )) + } #[test] fn test_checkpoint_visitor() -> DeltaResult<()> { @@ -370,7 +503,7 @@ mod tests { let mut visitor = CheckpointVisitor::new( &mut seen_file_keys, true, - vec![true; 8], + vec![true; 9], 0, // minimum_file_retention_timestamp (no expired tombstones) false, false, @@ -388,6 +521,7 @@ mod tests { false, // Row 5 is a cdc action (excluded) false, // Row 6 is a sidecar action (excluded) true, // Row 7 is a txn action (included) + false, // Row 8 is a checkpointMetadata action (excluded) ]; assert_eq!(visitor.file_actions_count, 2); @@ -477,14 +611,14 @@ mod tests { } #[test] - fn test_checkpoint_visitor_conflicts_with_deletion_vectors() -> DeltaResult<()> { + fn test_checkpoint_visitor_file_actions_with_deletion_vectors() -> DeltaResult<()> { let json_strings: StringArray = vec![ // Add action for file1 with deletion vector - r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"two","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, - // Remove action for file1 with a different deletion vector - r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"one","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, - // Add action for file1 with the same deletion vector as the remove action above (excluded) - r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"one","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, + r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"ONE","pathOrInlineDv":"dv1","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, + // Remove action for file1 with a different deletion vector + r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"TWO","pathOrInlineDv":"dv2","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, + // Remove action for file1 with another different deletion vector + r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"THREE","pathOrInlineDv":"dv3","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, ] .into(); let batch = parse_json_batch(json_strings); @@ -503,9 +637,9 @@ mod tests { visitor.visit_rows_of(batch.as_ref())?; - let expected = vec![true, true, false]; + let expected = vec![true, true, true]; assert_eq!(visitor.selection_vector, expected); - assert_eq!(visitor.file_actions_count, 2); + assert_eq!(visitor.file_actions_count, 3); assert_eq!(visitor.add_actions_count, 1); assert_eq!(visitor.non_file_actions_count, 0); @@ -585,4 +719,119 @@ mod tests { Ok(()) } + + /// This test ensures that the processor correctly deduplicates and filters + /// non-file actions (metadata, protocol, txn) across multiple batches. + #[test] + fn test_checkpoint_actions_iter_non_file_actions() -> DeltaResult<()> { + // Batch 1: protocol, metadata, and txn actions + let batch1 = vec![ + r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#, + r#"{"metaData":{"id":"test1","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, + r#"{"txn":{"appId":"app1","version":1,"lastUpdated":123456789}}"#, + ]; + + // Batch 2: duplicate actions, and a new txn action + let batch2 = vec![ + // Duplicates that should be skipped + r#"{"protocol":{"minReaderVersion":2,"minWriterVersion":3}}"#, + r#"{"metaData":{"id":"test2","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, + r#"{"txn":{"appId":"app1","version":1,"lastUpdated":123456789}}"#, + // Unique transaction (appId) should be included + r#"{"txn":{"appId":"app2","version":1,"lastUpdated":123456789}}"#, + ]; + + // Batch 3: a duplicate action (entire batch should be skipped) + let batch3 = vec![r#"{"protocol":{"minReaderVersion":2,"minWriterVersion":3}}"#]; + + let input_batches = vec![ + create_batch(batch1)?, + create_batch(batch2)?, + create_batch(batch3)?, + ]; + let (results, actions_count, add_actions) = run_checkpoint_test(input_batches)?; + + // Verify results + assert_eq!(results.len(), 2, "Expected two batches in results"); + assert_eq!(results[0].selection_vector, vec![true, true, true],); + assert_eq!(results[1].selection_vector, vec![false, false, false, true],); + assert_eq!(actions_count, 4); + assert_eq!(add_actions, 0); + + Ok(()) + } + + /// This test ensures that the processor correctly deduplicates and filters + /// file actions (add, remove) across multiple batches. + #[test] + fn test_checkpoint_actions_iter_file_actions() -> DeltaResult<()> { + // Batch 1: add action (file1) - new, should be included + let batch1 = vec![ + r#"{"add":{"path":"file1","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true}}"#, + ]; + + // Batch 2: remove actions - mixed inclusion + let batch2 = vec![ + // Already seen file, should be excluded + r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"partitionValues":{}}}"#, + // New file, should be included + r#"{"remove":{"path":"file2","deletionTimestamp":100,"dataChange":true,"partitionValues":{}}}"#, + ]; + + // Batch 3: add action (file2) - already seen, should be excluded + let batch3 = vec![ + r#"{"add":{"path":"file2","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true}}"#, + ]; + + let input_batches = vec![ + create_batch(batch1)?, + create_batch(batch2)?, + create_batch(batch3)?, + ]; + let (results, actions_count, add_actions) = run_checkpoint_test(input_batches)?; + + // Verify results + assert_eq!(results.len(), 2); // The third batch should be filtered out since there are no selected actions + assert_eq!(results[0].selection_vector, vec![true]); + assert_eq!(results[1].selection_vector, vec![false, true]); + assert_eq!(actions_count, 2); + assert_eq!(add_actions, 1); + + Ok(()) + } + + /// This test ensures that the processor correctly deduplicates and filters + /// file actions (add, remove) with deletion vectors across multiple batches. + #[test] + fn test_checkpoint_actions_iter_file_actions_with_deletion_vectors() -> DeltaResult<()> { + // Batch 1: add actions with deletion vectors + let batch1 = vec![ + // (file1, DV_ONE) New, should be included + r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"ONE","pathOrInlineDv":"dv1","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, + // (file1, DV_TWO) New, should be included + r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"TWO","pathOrInlineDv":"dv2","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, + ]; + + // Batch 2: mixed actions with duplicate and new entries + let batch2 = vec![ + // (file1, DV_ONE): Already seen, should be excluded + r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"ONE","pathOrInlineDv":"dv1","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, + // (file1, DV_TWO): Already seen, should be excluded + r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"TWO","pathOrInlineDv":"dv2","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, + // New file, should be included + r#"{"remove":{"path":"file2","deletionTimestamp":100,"dataChange":true,"partitionValues":{}}}"#, + ]; + + let input_batches = vec![create_batch(batch1)?, create_batch(batch2)?]; + let (results, actions_count, add_actions) = run_checkpoint_test(input_batches)?; + + // Verify results + assert_eq!(results.len(), 2); + assert_eq!(results[0].selection_vector, vec![true, true]); + assert_eq!(results[1].selection_vector, vec![false, false, true]); + assert_eq!(actions_count, 3); + assert_eq!(add_actions, 2); + + Ok(()) + } } diff --git a/kernel/src/checkpoint/mod.rs b/kernel/src/checkpoint/mod.rs index e18479696d..490e2cf993 100644 --- a/kernel/src/checkpoint/mod.rs +++ b/kernel/src/checkpoint/mod.rs @@ -5,4 +5,4 @@ //! avoiding full log replay. This API supports three checkpoint types: //! //! TODO!(seb): Include docs when implemented -mod log_replay; +pub(crate) mod log_replay; diff --git a/kernel/src/engine_data.rs b/kernel/src/engine_data.rs index 54cce0e260..44ada91e78 100644 --- a/kernel/src/engine_data.rs +++ b/kernel/src/engine_data.rs @@ -1,5 +1,6 @@ //! Traits that engines need to implement in order to pass data between themselves and kernel. +use crate::log_replay::HasSelectionVector; use crate::schema::{ColumnName, DataType}; use crate::{AsAny, DeltaResult, Error}; @@ -20,6 +21,13 @@ pub struct FilteredEngineData { pub selection_vector: Vec, } +impl HasSelectionVector for FilteredEngineData { + /// Returns true if any row in the selection vector is marked as selected + fn has_selected_rows(&self) -> bool { + self.selection_vector.contains(&true) + } +} + /// a trait that an engine exposes to give access to a list pub trait EngineList { /// Return the length of the list at the specified row_index in the raw data diff --git a/kernel/src/log_replay.rs b/kernel/src/log_replay.rs index c9a58492f9..e6f8fdfb8e 100644 --- a/kernel/src/log_replay.rs +++ b/kernel/src/log_replay.rs @@ -13,16 +13,18 @@ //! This module provides structures for efficient batch processing, focusing on file action //! deduplication with `FileActionDeduplicator` which tracks unique files across log batches //! to minimize memory usage for tables with extensive history. - -use std::collections::HashSet; - +//! +//! [`CheckpointLogReplayProcessor`]: crate::checkpoint::log_replay::CheckpointLogReplayProcessor +//! [`ScanLogReplayProcessor`]: crate::scan::log_replay::ScanLogReplayProcessor +//! [`ScanMetadata`]: crate::scan::ScanMetadata use crate::actions::deletion_vector::DeletionVectorDescriptor; use crate::engine_data::{GetData, TypedGetData}; use crate::scan::data_skipping::DataSkippingFilter; use crate::{DeltaResult, EngineData}; -use tracing::debug; +use std::collections::HashSet; +use tracing::debug; /// The subset of file action fields that uniquely identifies it in the log, used for deduplication /// of adds and removes during log replay. #[derive(Debug, Hash, Eq, PartialEq)] @@ -148,17 +150,15 @@ impl<'seen> FileActionDeduplicator<'seen> { /// This method examines the data at the given index using the provided getters /// to identify whether a file action exists and what type it is. /// - /// # Arguments - /// - /// * `i` - Index position in the data structure to examine - /// * `getters` - Collection of data getter implementations used to access the data - /// * `skip_removes` - Whether to skip remove actions when extracting file actions + /// # Parameters + /// - `i`: Index position in the data structure to examine + /// - `getters`: Collection of data getter implementations used to access the data + /// - `skip_removes`: Whether to skip remove actions when extracting file actions /// /// # Returns - /// - /// * `Ok(Some((key, is_add)))` - When a file action is found, returns the key and whether it's an add operation - /// * `Ok(None)` - When no file action is found - /// * `Err(...)` - On any error during extraction + /// - `Ok(Some((key, is_add)))`: When a file action is found, returns the key and whether it's an add operation + /// - `Ok(None)`: When no file action is found + /// - `Err(...)`: On any error during extraction pub(crate) fn extract_file_action<'a>( &self, i: usize, @@ -207,39 +207,53 @@ impl<'seen> FileActionDeduplicator<'seen> { /// - **Data skipping** filters are applied to the initial selection vector to reduce the number of rows /// processed by the processor, (if a filter is provided). /// -/// Implementations: -/// - `ScanLogReplayProcessor`: Used for table scans, this processor filters and selects deduplicated +/// # Implementations +/// +/// - [`ScanLogReplayProcessor`]: Used for table scans, this processor filters and selects deduplicated /// `Add` actions from log batches to reconstruct the view of the table at a specific point in time. /// Note that scans do not expose `Remove` actions. Data skipping may be applied when a predicate is /// provided. /// -/// - `CheckpointLogReplayProcessor` (WIP): Will be responsible for processing log batches to construct -/// V1 spec checkpoint files. Unlike scans, checkpoint processing includes additional actions, such as -/// `Remove`, `Metadata`, and `Protocol`, required to fully reconstruct table state. -/// Data skipping is not applied during checkpoint processing. +/// - [`CheckpointLogReplayProcessor`]: Used for writing checkpoints, this processor filters and selects +/// actions from log batches for inclusion in V1 spec checkpoint files. Unlike scans, checkpoint +/// processing includes additional actions, such as `Remove`, `Metadata`, and `Protocol`, required to +/// fully reconstruct table state. Data skipping is not applied during checkpoint processing. +/// +/// # Action Iterator Input +/// +/// The [`LogReplayProcessor::process_actions_iter`] method is the entry point for log replay processing. +/// It takes as input an iterator of (actions batch, is_commit_batch flag) tuples and returns an iterator of +/// processor-specific output types with selection vectors. The is_commit_batch bool flag in each tuple +/// indicates whether the batch came from a commit log (`true`) or checkpoint (`false`). Action batches +/// **must** be sorted by the order of the actions in the log from most recent to oldest. /// -/// The `Output` type represents the material result of log replay, and it must implement the -/// `HasSelectionVector` trait to allow filtering of irrelevant rows: +/// Each row that is selected in the returned output **must** be included in the processor's result +/// (e.g., in scan results or checkpoint files), while non-selected rows **must** be ignored. /// -/// - For **scans**, the output type is `ScanMetadata`, which contains the file actions (`Add` +/// # Output Types +/// +/// The [`LogReplayProcessor::Output`] type represents the material result of log replay, and it must +/// implement the [`HasSelectionVector`] trait to allow filtering of irrelevant rows: +/// +/// - For **scans**, the output type is [`ScanMetadata`], which contains the file actions (`Add` /// actions) that need to be applied to build the table's view, accompanied by a /// **selection vector** that identifies which rows should be included. A transform vector may /// also be included to handle schema changes, such as renaming columns or modifying data types. /// -/// - For **checkpoints**, the output includes the actions necessary to write to the checkpoint file (`Add`, -/// `Remove`, `Metadata`, `Protocol` actions), filtered by the **selection vector** to determine which -/// rows are included in the final checkpoint. +/// - For **checkpoints**, the output type is [`FilteredEngineData`], which includes the actions +/// necessary to write to the checkpoint file (`Add`, `Remove`, `Metadata`, `Protocol` actions), +/// filtered by the **selection vector** to determine which rows are included in the final checkpoint. /// /// TODO: Refactor the Change Data Feed (CDF) processor to use this trait. pub(crate) trait LogReplayProcessor: Sized { /// The type of results produced by this processor must implement the - /// `HasSelectionVector` trait to allow filtering out batches with no selected rows. + /// [`HasSelectionVector`] trait to allow filtering out batches with no selected rows. type Output: HasSelectionVector; /// Processes a batch of actions and returns the filtered results. /// - /// # Arguments - /// - `actions_batch` - A reference to an [`EngineData`] instance representing a batch of actions. + /// # Parameters + /// - `actions_batch` - A boxed [`EngineData`] instance representing a batch of actions. /// - `is_log_batch` - `true` if the batch originates from a commit log, `false` if from a checkpoint. /// /// Returns a [`DeltaResult`] containing the processor’s output, which includes only selected actions. @@ -247,16 +261,25 @@ pub(crate) trait LogReplayProcessor: Sized { /// Note: Since log replay is stateful, processing may update internal processor state (e.g., deduplication sets). fn process_actions_batch( &mut self, - actions_batch: &dyn EngineData, + actions_batch: Box, is_log_batch: bool, ) -> DeltaResult; /// Applies the processor to an actions iterator and filters out empty results. /// - /// # Arguments - /// * `action_iter` - Iterator of action batches and their source flags + /// This method: + /// 1. Applies `process_actions_batch` to each action batch + /// 2. Maintains processor state across all batches + /// 3. Automatically filters out batches with no selected rows /// - /// Returns an iterator that yields the Output type of the processor. + /// # Parameters + /// - `action_iter`: Iterator of (batch, is_commit_batch) tuples, where each batch contains actions + /// and the boolean flag indicates whether the batch came from a commit log (`true`) or checkpoint + /// (`false`). Actions must be provided in reverse chronological order. + /// + /// # Returns + /// An iterator that yields the output type of the processor, containing only non-empty results + /// (batches where at least one row was selected). fn process_actions_iter( mut self, action_iter: impl Iterator, bool)>>, @@ -264,7 +287,7 @@ pub(crate) trait LogReplayProcessor: Sized { action_iter .map(move |action_res| { let (batch, is_log_batch) = action_res?; - self.process_actions_batch(batch.as_ref(), is_log_batch) + self.process_actions_batch(batch, is_log_batch) }) .filter(|res| { // TODO: Leverage .is_none_or() when msrv = 1.82 @@ -281,8 +304,8 @@ pub(crate) trait LogReplayProcessor: Sized { /// The selection vector is further updated based on the processor's logic in the /// `process_actions_batch` method. /// - /// # Arguments - /// - `batch` - A reference to the batch of actions to be processed. + /// # Parameters + /// - `batch`: A reference to the batch of actions to be processed. /// /// # Returns /// A `DeltaResult>`, where each boolean indicates if the corresponding row should be included. diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 4ae6d28a5f..29df6d8ca1 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -38,7 +38,7 @@ use crate::{DeltaResult, Engine, EngineData, Error, ExpressionEvaluator}; /// produces a [`ScanMetadata`] result. This result includes the transformed batch, a selection /// vector indicating which rows are valid, and any row-level transformation expressions that need /// to be applied to the selected rows. -struct ScanLogReplayProcessor { +pub(crate) struct ScanLogReplayProcessor { partition_filter: Option, data_skipping_filter: Option, add_transform: Arc, @@ -344,13 +344,13 @@ impl LogReplayProcessor for ScanLogReplayProcessor { fn process_actions_batch( &mut self, - actions_batch: &dyn EngineData, + actions_batch: Box, is_log_batch: bool, ) -> DeltaResult { // Build an initial selection vector for the batch which has had the data skipping filter // applied. The selection vector is further updated by the deduplication visitor to remove // rows that are not valid adds. - let selection_vector = self.build_selection_vector(actions_batch)?; + let selection_vector = self.build_selection_vector(actions_batch.as_ref())?; assert_eq!(selection_vector.len(), actions_batch.len()); let mut visitor = AddRemoveDedupVisitor::new( @@ -361,10 +361,10 @@ impl LogReplayProcessor for ScanLogReplayProcessor { self.partition_filter.clone(), is_log_batch, ); - visitor.visit_rows_of(actions_batch)?; + visitor.visit_rows_of(actions_batch.as_ref())?; // TODO: Teach expression eval to respect the selection vector we just computed so carefully! - let result = self.add_transform.evaluate(actions_batch)?; + let result = self.add_transform.evaluate(actions_batch.as_ref())?; Ok(ScanMetadata::new( result, visitor.selection_vector, @@ -381,6 +381,9 @@ impl LogReplayProcessor for ScanLogReplayProcessor { /// `(engine_data, selection_vec)`. Each row that is selected in the returned `engine_data` _must_ /// be processed to complete the scan. Non-selected rows _must_ be ignored. The boolean flag /// indicates whether the record batch is a log or checkpoint batch. +/// +/// Note: The iterator of (engine_data, bool) tuples 'action_iter' parameter must be sorted by the +/// order of the actions in the log from most recent to least recent. pub(crate) fn scan_action_iter( engine: &dyn Engine, action_iter: impl Iterator, bool)>>, diff --git a/kernel/src/utils.rs b/kernel/src/utils.rs index 27e5bc3cfb..6eea84f8b3 100644 --- a/kernel/src/utils.rs +++ b/kernel/src/utils.rs @@ -13,11 +13,13 @@ pub(crate) use require; #[cfg(test)] pub(crate) mod test_utils { - use crate::actions::get_log_schema; + use crate::actions::{get_log_schema, Add, Cdc, CommitInfo, Metadata, Protocol, Remove}; use crate::arrow::array::{RecordBatch, StringArray}; use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use crate::engine::arrow_data::ArrowEngineData; use crate::engine::sync::SyncEngine; use crate::Engine; + use crate::EngineData; use itertools::Itertools; use object_store::local::LocalFileSystem; @@ -27,12 +29,6 @@ pub(crate) mod test_utils { use tempfile::TempDir; use test_utils::delta_path_for_version; - use crate::{ - actions::{Add, Cdc, CommitInfo, Metadata, Protocol, Remove}, - engine::arrow_data::ArrowEngineData, - EngineData, - }; - #[derive(Serialize)] pub(crate) enum Action { #[serde(rename = "add")] @@ -130,6 +126,7 @@ pub(crate) mod test_utils { r#"{"cdc":{"path":"_change_data/age=21/cdc-00000-93f7fceb-281a-446a-b221-07b88132d203.c000.snappy.parquet","partitionValues":{"age":"21"},"size":1033,"dataChange":false}}"#, r#"{"sidecar":{"path":"016ae953-37a9-438e-8683-9a9a4a79a395.parquet","sizeInBytes":9268,"modificationTime":1714496113961,"tags":{"tag_foo":"tag_bar"}}}"#, r#"{"txn":{"appId":"myApp","version": 3}}"#, + r#"{"checkpointMetadata":{"version":2, "tags":{"tag_foo":"tag_bar"}}}"#, ] .into(); parse_json_batch(json_strings)