diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 1f310e8659..d40a515e6f 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -77,6 +77,7 @@ pub mod actions; pub mod engine_data; pub mod error; pub mod expressions; +pub mod log_replay; pub mod scan; pub mod schema; pub mod snapshot; diff --git a/kernel/src/log_replay.rs b/kernel/src/log_replay.rs new file mode 100644 index 0000000000..9eddae2931 --- /dev/null +++ b/kernel/src/log_replay.rs @@ -0,0 +1,192 @@ +//! This module provides log replay utilities. +//! +//! Log replay is the process of transforming an iterator of action batches (read from Delta +//! transaction logs) into an iterator of filtered/transformed actions for specific use cases. +//! The logs, which record all table changes as JSON entries, are processed batch by batch, +//! typically from newest to oldest. +//! +//! Log replay is currently implemented for table scans, which filter and apply transformations +//! to produce file actions which builds the view of the table state at a specific point in time. +//! Future extensions will support additional log replay processors beyond the current use case. +//! (e.g. checkpointing: filter actions to include only those needed to rebuild table state) +//! +//! This module provides structures for efficient batch processing, focusing on file action +//! deduplication with `FileActionDeduplicator` which tracks unique files across log batches +//! to minimize memory usage for tables with extensive history. + +use std::collections::HashSet; + +use crate::actions::deletion_vector::DeletionVectorDescriptor; +use crate::engine_data::{GetData, TypedGetData}; +use crate::DeltaResult; + +use tracing::debug; + +/// The subset of file action fields that uniquely identifies it in the log, used for deduplication +/// of adds and removes during log replay. +#[derive(Debug, Hash, Eq, PartialEq)] +pub(crate) struct FileActionKey { + pub(crate) path: String, + pub(crate) dv_unique_id: Option, +} +impl FileActionKey { + pub(crate) fn new(path: impl Into, dv_unique_id: Option) -> Self { + let path = path.into(); + Self { path, dv_unique_id } + } +} + +/// Maintains state and provides functionality for deduplicating file actions during log replay. +/// +/// This struct is embedded in visitors to track which files have been seen across multiple +/// log batches. Since logs are processed newest-to-oldest, this deduplicator ensures that each +/// unique file (identified by path and deletion vector ID) is processed only once. Performing +/// deduplication at the visitor level avoids having to load all actions into memory at once, +/// significantly reducing memory usage for large Delta tables with extensive history. +pub(crate) struct FileActionDeduplicator<'seen> { + /// A set of (data file path, dv_unique_id) pairs that have been seen thus + /// far in the log for deduplication. This is a mutable reference to the set + /// of seen file keys that persists across multiple log batches. + seen_file_keys: &'seen mut HashSet, + // TODO: Consider renaming to `is_commit_batch`, `deduplicate_batch`, or `save_batch` + // to better reflect its role in deduplication logic. + /// Whether we're processing a log batch (as opposed to a checkpoint) + is_log_batch: bool, + /// Index of the getter containing the add.path column + add_path_index: usize, + /// Index of the getter containing the remove.path column + remove_path_index: usize, + /// Starting index for add action deletion vector columns + add_dv_start_index: usize, + /// Starting index for remove action deletion vector columns + remove_dv_start_index: usize, +} + +impl<'seen> FileActionDeduplicator<'seen> { + pub(crate) fn new( + seen_file_keys: &'seen mut HashSet, + is_log_batch: bool, + add_path_index: usize, + remove_path_index: usize, + add_dv_start_index: usize, + remove_dv_start_index: usize, + ) -> Self { + Self { + seen_file_keys, + is_log_batch, + add_path_index, + remove_path_index, + add_dv_start_index, + remove_dv_start_index, + } + } + + /// Checks if log replay already processed this logical file (in which case the current action + /// should be ignored). If not already seen, register it so we can recognize future duplicates. + /// Returns `true` if we have seen the file and should ignore it, `false` if we have not seen it + /// and should process it. + pub(crate) fn check_and_record_seen(&mut self, key: FileActionKey) -> bool { + // Note: each (add.path + add.dv_unique_id()) pair has a + // unique Add + Remove pair in the log. For example: + // https://github.com/delta-io/delta/blob/master/spark/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000001.json + + if self.seen_file_keys.contains(&key) { + debug!( + "Ignoring duplicate ({}, {:?}) in scan, is log {}", + key.path, key.dv_unique_id, self.is_log_batch + ); + true + } else { + debug!( + "Including ({}, {:?}) in scan, is log {}", + key.path, key.dv_unique_id, self.is_log_batch + ); + if self.is_log_batch { + // Remember file actions from this batch so we can ignore duplicates as we process + // batches from older commit and/or checkpoint files. We don't track checkpoint + // batches because they are already the oldest actions and never replace anything. + self.seen_file_keys.insert(key); + } + false + } + } + + /// Extracts the deletion vector unique ID if it exists. + /// + /// This function retrieves the necessary fields for constructing a deletion vector unique ID + /// by accessing `getters` at `dv_start_index` and the following two indices. Specifically: + /// - `dv_start_index` retrieves the storage type (`deletionVector.storageType`). + /// - `dv_start_index + 1` retrieves the path or inline deletion vector (`deletionVector.pathOrInlineDv`). + /// - `dv_start_index + 2` retrieves the optional offset (`deletionVector.offset`). + fn extract_dv_unique_id<'a>( + &self, + i: usize, + getters: &[&'a dyn GetData<'a>], + dv_start_index: usize, + ) -> DeltaResult> { + match getters[dv_start_index].get_opt(i, "deletionVector.storageType")? { + Some(storage_type) => { + let path_or_inline = + getters[dv_start_index + 1].get(i, "deletionVector.pathOrInlineDv")?; + let offset = getters[dv_start_index + 2].get_opt(i, "deletionVector.offset")?; + + Ok(Some(DeletionVectorDescriptor::unique_id_from_parts( + storage_type, + path_or_inline, + offset, + ))) + } + None => Ok(None), + } + } + + /// Extracts a file action key and determines if it's an add operation. + /// This method examines the data at the given index using the provided getters + /// to identify whether a file action exists and what type it is. + /// + /// # Arguments + /// + /// * `i` - Index position in the data structure to examine + /// * `getters` - Collection of data getter implementations used to access the data + /// * `skip_removes` - Whether to skip remove actions when extracting file actions + /// + /// # Returns + /// + /// * `Ok(Some((key, is_add)))` - When a file action is found, returns the key and whether it's an add operation + /// * `Ok(None)` - When no file action is found + /// * `Err(...)` - On any error during extraction + pub(crate) fn extract_file_action<'a>( + &self, + i: usize, + getters: &[&'a dyn GetData<'a>], + skip_removes: bool, + ) -> DeltaResult> { + // Try to extract an add action by the required path column + if let Some(path) = getters[self.add_path_index].get_str(i, "add.path")? { + let dv_unique_id = self.extract_dv_unique_id(i, getters, self.add_dv_start_index)?; + return Ok(Some((FileActionKey::new(path, dv_unique_id), true))); + } + + // The AddRemoveDedupVisitor skips remove actions when extracting file actions from a checkpoint batch. + if skip_removes { + return Ok(None); + } + + // Try to extract a remove action by the required path column + if let Some(path) = getters[self.remove_path_index].get_str(i, "remove.path")? { + let dv_unique_id = self.extract_dv_unique_id(i, getters, self.remove_dv_start_index)?; + return Ok(Some((FileActionKey::new(path, dv_unique_id), false))); + } + + // No file action found + Ok(None) + } + + /// Returns whether we are currently processing a log batch. + /// + /// `true` indicates we are processing a batch from a commit file. + /// `false` indicates we are processing a batch from a checkpoint. + pub(crate) fn is_log_batch(&self) -> bool { + self.is_log_batch + } +} diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 0e26b610f7..37e5044059 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -3,33 +3,19 @@ use std::collections::{HashMap, HashSet}; use std::sync::{Arc, LazyLock}; use itertools::Itertools; -use tracing::debug; use super::data_skipping::DataSkippingFilter; use super::{ScanData, Transform}; use crate::actions::get_log_add_schema; use crate::engine_data::{GetData, RowVisitor, TypedGetData as _}; use crate::expressions::{column_expr, column_name, ColumnName, Expression, ExpressionRef}; +use crate::log_replay::{FileActionDeduplicator, FileActionKey}; use crate::predicates::{DefaultPredicateEvaluator, PredicateEvaluator as _}; -use crate::scan::{DeletionVectorDescriptor, Scalar, TransformExpr}; +use crate::scan::{Scalar, TransformExpr}; use crate::schema::{ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType}; use crate::utils::require; use crate::{DeltaResult, Engine, EngineData, Error, ExpressionEvaluator}; -/// The subset of file action fields that uniquely identifies it in the log, used for deduplication -/// of adds and removes during log replay. -#[derive(Debug, Hash, Eq, PartialEq)] -struct FileActionKey { - path: String, - dv_unique_id: Option, -} -impl FileActionKey { - fn new(path: impl Into, dv_unique_id: Option) -> Self { - let path = path.into(); - Self { path, dv_unique_id } - } -} - struct LogReplayScanner { partition_filter: Option, data_skipping_filter: Option, @@ -45,43 +31,44 @@ struct LogReplayScanner { /// pair, we should ignore all subsequent (older) actions for that same (path, dvId) pair. If the /// first action for a given file is a remove, then that file does not show up in the result at all. struct AddRemoveDedupVisitor<'seen> { - seen: &'seen mut HashSet, + deduplicator: FileActionDeduplicator<'seen>, selection_vector: Vec, logical_schema: SchemaRef, transform: Option>, partition_filter: Option, row_transform_exprs: Vec>, - is_log_batch: bool, } impl AddRemoveDedupVisitor<'_> { - /// Checks if log replay already processed this logical file (in which case the current action - /// should be ignored). If not already seen, register it so we can recognize future duplicates. - /// Returns `true` if we have seen the file and should ignore it, `false` if we have not seen it - /// and should process it. - fn check_and_record_seen(&mut self, key: FileActionKey) -> bool { - // Note: each (add.path + add.dv_unique_id()) pair has a - // unique Add + Remove pair in the log. For example: - // https://github.com/delta-io/delta/blob/master/spark/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000001.json - - if self.seen.contains(&key) { - debug!( - "Ignoring duplicate ({}, {:?}) in scan, is log {}", - key.path, key.dv_unique_id, self.is_log_batch - ); - true - } else { - debug!( - "Including ({}, {:?}) in scan, is log {}", - key.path, key.dv_unique_id, self.is_log_batch - ); - if self.is_log_batch { - // Remember file actions from this batch so we can ignore duplicates as we process - // batches from older commit and/or checkpoint files. We don't track checkpoint - // batches because they are already the oldest actions and never replace anything. - self.seen.insert(key); - } - false + // The index position in the row getters for the following columns + const ADD_PATH_INDEX: usize = 0; + const ADD_PARTITION_VALUES_INDEX: usize = 1; + const ADD_DV_START_INDEX: usize = 2; + const REMOVE_PATH_INDEX: usize = 5; + const REMOVE_DV_START_INDEX: usize = 6; + + fn new( + seen: &mut HashSet, + selection_vector: Vec, + logical_schema: SchemaRef, + transform: Option>, + partition_filter: Option, + is_log_batch: bool, + ) -> AddRemoveDedupVisitor<'_> { + AddRemoveDedupVisitor { + deduplicator: FileActionDeduplicator::new( + seen, + is_log_batch, + Self::ADD_PATH_INDEX, + Self::REMOVE_PATH_INDEX, + Self::ADD_DV_START_INDEX, + Self::REMOVE_DV_START_INDEX, + ), + selection_vector, + logical_schema, + transform, + partition_filter, + row_transform_exprs: Vec::new(), } } @@ -162,28 +149,21 @@ impl AddRemoveDedupVisitor<'_> { /// True if this row contains an Add action that should survive log replay. Skip it if the row /// is not an Add action, or the file has already been seen previously. fn is_valid_add<'a>(&mut self, i: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult { - // Add will have a path at index 0 if it is valid; otherwise, if it is a log batch, we may - // have a remove with a path at index 4. In either case, extract the three dv getters at - // indexes that immediately follow a valid path index. - let (path, dv_getters, is_add) = if let Some(path) = getters[0].get_str(i, "add.path")? { - (path, &getters[2..5], true) - } else if !self.is_log_batch { - return Ok(false); - } else if let Some(path) = getters[5].get_opt(i, "remove.path")? { - (path, &getters[6..9], false) - } else { + // When processing file actions, we extract path and deletion vector information based on action type: + // - For Add actions: path is at index 0, followed by DV fields at indexes 2-4 + // - For Remove actions (in log batches only): path is at index 5, followed by DV fields at indexes 6-8 + // The file extraction logic selects the appropriate indexes based on whether we found a valid path. + // Remove getters are not included when visiting a non-log batch (checkpoint batch), so do + // not try to extract remove actions in that case. + let Some((file_key, is_add)) = self.deduplicator.extract_file_action( + i, + getters, + !self.deduplicator.is_log_batch(), // skip_removes. true if this is a checkpoint batch + )? + else { return Ok(false); }; - let dv_unique_id = match dv_getters[0].get_opt(i, "deletionVector.storageType")? { - Some(storage_type) => Some(DeletionVectorDescriptor::unique_id_from_parts( - storage_type, - dv_getters[1].get(i, "deletionVector.pathOrInlineDv")?, - dv_getters[2].get_opt(i, "deletionVector.offset")?, - )), - None => None, - }; - // Apply partition pruning (to adds only) before deduplication, so that we don't waste memory // tracking pruned files. Removes don't get pruned and we'll still have to track them. // @@ -192,7 +172,8 @@ impl AddRemoveDedupVisitor<'_> { // encounter if the table's schema was replaced after the most recent checkpoint. let partition_values = match &self.transform { Some(transform) if is_add => { - let partition_values = getters[1].get(i, "add.partitionValues")?; + let partition_values = + getters[Self::ADD_PARTITION_VALUES_INDEX].get(i, "add.partitionValues")?; let partition_values = self.parse_partition_values(transform, &partition_values)?; if self.is_file_partition_pruned(&partition_values) { return Ok(false); @@ -203,8 +184,7 @@ impl AddRemoveDedupVisitor<'_> { }; // Check both adds and removes (skipping already-seen), but only transform and return adds - let file_key = FileActionKey::new(path, dv_unique_id); - if self.check_and_record_seen(file_key) || !is_add { + if self.deduplicator.check_and_record_seen(file_key) || !is_add { return Ok(false); } let transform = self @@ -243,7 +223,7 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> { (names, types).into() }); let (names, types) = NAMES_AND_TYPES.as_ref(); - if self.is_log_batch { + if self.deduplicator.is_log_batch() { (names, types) } else { // All checkpoint actions are already reconciled and Remove actions in checkpoint files @@ -253,7 +233,8 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> { } fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { - let expected_getters = if self.is_log_batch { 9 } else { 5 }; + let is_log_batch = self.deduplicator.is_log_batch(); + let expected_getters = if is_log_batch { 9 } else { 5 }; require!( getters.len() == expected_getters, Error::InternalError(format!( @@ -336,21 +317,23 @@ impl LogReplayScanner { }; assert_eq!(selection_vector.len(), actions.len()); - let mut visitor = AddRemoveDedupVisitor { - seen: &mut self.seen, + let mut visitor = AddRemoveDedupVisitor::new( + &mut self.seen, selection_vector, logical_schema, transform, - partition_filter: self.partition_filter.clone(), - row_transform_exprs: Vec::new(), + self.partition_filter.clone(), is_log_batch, - }; + ); visitor.visit_rows_of(actions)?; // TODO: Teach expression eval to respect the selection vector we just computed so carefully! - let selection_vector = visitor.selection_vector; let result = add_transform.evaluate(actions)?; - Ok((result, selection_vector, visitor.row_transform_exprs)) + Ok(( + result, + visitor.selection_vector, + visitor.row_transform_exprs, + )) } }