Skip to content
Merged
1 change: 1 addition & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub mod actions;
pub mod engine_data;
pub mod error;
pub mod expressions;
pub mod log_replay;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just noticing (sorry this snuck through somehow) this shouldn't be pub

pub mod scan;
pub mod schema;
pub mod snapshot;
Expand Down
182 changes: 182 additions & 0 deletions kernel/src/log_replay.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
//! This module provides structures and functionality to faciliate the log replay process.
use std::collections::HashSet;

use crate::actions::deletion_vector::DeletionVectorDescriptor;
use crate::engine_data::{GetData, TypedGetData};
use crate::DeltaResult;

use tracing::debug;

/// The subset of file action fields that uniquely identifies it in the log, used for deduplication
/// of adds and removes during log replay.
#[derive(Debug, Hash, Eq, PartialEq)]
pub(crate) struct FileActionKey {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from scan/log_replay.rs

pub(crate) path: String,
pub(crate) dv_unique_id: Option<String>,
}
impl FileActionKey {
pub(crate) fn new(path: impl Into<String>, dv_unique_id: Option<String>) -> Self {
let path = path.into();
Self { path, dv_unique_id }
}
}

/// Maintains state and provides functionality for deduplicating file actions during log replay.
///
/// This struct is embedded in visitors to track which files have been seen across multiple
/// log batches. Since logs are processed newest-to-oldest, this deduplicator ensures that each
/// unique file (identified by path and deletion vector ID) is processed only once. Performing
/// deduplication at the visitor level avoids having to load all actions into memory at once,
/// significantly reducing memory usage for large Delta tables with extensive history.
pub(crate) struct FileActionDeduplicator<'seen> {
/// A set of (data file path, dv_unique_id) pairs that have been seen thus
/// far in the log for deduplication. This is a mutable reference to the set
/// of seen file keys that persists across multiple log batches.
seen_file_keys: &'seen mut HashSet<FileActionKey>,
/// Selection vector to track which rows should be included
selection_vector: Vec<bool>,
/// Whether we're processing a log batch (as opposed to a checkpoint)
is_log_batch: bool,
/// Index of the getter containing the add.path column
add_path_index: usize,
/// Index of the getter containing the remove.path column
remove_path_index: usize,
/// Starting index for add action deletion vector columns
add_dv_start_index: usize,
/// Starting index for remove action deletion vector columns
remove_dv_start_index: usize,
}

impl<'seen> FileActionDeduplicator<'seen> {
pub(crate) fn new(
seen_file_keys: &'seen mut HashSet<FileActionKey>,
selection_vector: Vec<bool>,
is_log_batch: bool,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking out loud: we could actually make this something like "deduplicate_batch: bool" or "save_batch: bool" ? baking in less checkpoint/commit specific logic and instead just communicating whether or not we should save the file actions in the hashmap for future deduplication (i.e. we do this in commit batches but not checkpoint batches)

let's not solve that here, perhaps talk about little rename in the future

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a todo...
// TODO: Consider renaming to `is_commit_batch`, `deduplicate_batch`, or `save_batch` to better reflect its role in deduplication logic.

add_path_index: usize,
remove_path_index: usize,
add_dv_start_index: usize,
remove_dv_start_index: usize,
) -> Self {
Self {
seen_file_keys,
selection_vector,
is_log_batch,
add_path_index,
remove_path_index,
add_dv_start_index,
remove_dv_start_index,
}
}

/// Checks if log replay already processed this logical file (in which case the current action
/// should be ignored). If not already seen, register it so we can recognize future duplicates.
/// Returns `true` if we have seen the file and should ignore it, `false` if we have not seen it
/// and should process it.
pub(crate) fn check_and_record_seen(&mut self, key: FileActionKey) -> bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call out what is just a simple move and what is a material change? AFAICT this is code that already existed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Directly extracted from AddRemoveDedupVisitor

// Note: each (add.path + add.dv_unique_id()) pair has a
// unique Add + Remove pair in the log. For example:
// https://github.com/delta-io/delta/blob/master/spark/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000001.json

if self.seen_file_keys.contains(&key) {
debug!(
"Ignoring duplicate ({}, {:?}) in scan, is log {}",
key.path, key.dv_unique_id, self.is_log_batch
);
true
} else {
debug!(
"Including ({}, {:?}) in scan, is log {}",
key.path, key.dv_unique_id, self.is_log_batch
);
if self.is_log_batch {
// Remember file actions from this batch so we can ignore duplicates as we process
// batches from older commit and/or checkpoint files. We don't track checkpoint
// batches because they are already the oldest actions and never replace anything.
self.seen_file_keys.insert(key);
}
false
}
}

/// Extract the deletion vector unique ID if it exists.
fn extract_dv_unique_id<'a>(
Copy link
Collaborator Author

@sebastiantia sebastiantia Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An internal refactor, only called from extract_file_action to generate the file action key from a row of EngineData

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we document some of the requirements here (e.g. you give a dv_start_index and then it does uses getters from that index and + 1 and + 2 for extracting the storage type, dv, and offset fields.)

&self,
i: usize,
getters: &[&'a dyn GetData<'a>],
dv_start_index: usize,
) -> DeltaResult<Option<String>> {
match getters[dv_start_index].get_opt(i, "deletionVector.storageType")? {
Some(storage_type) => {
let path_or_inline =
getters[dv_start_index + 1].get(i, "deletionVector.pathOrInlineDv")?;
let offset = getters[dv_start_index + 2].get_opt(i, "deletionVector.offset")?;

Ok(Some(DeletionVectorDescriptor::unique_id_from_parts(
storage_type,
path_or_inline,
offset,
)))
}
None => Ok(None),
}
}

/// Extracts a file action key and determines if it's an add operation.
/// This method examines the data at the given index using the provided getters
/// to identify whether a file action exists and what type it is.
///
/// # Arguments
///
/// * `i` - Index position in the data structure to examine
/// * `getters` - Collection of data getter implementations used to access the data
/// * `skip_removes` - Whether to skip remove actions when extracting file actions
///
/// # Returns
///
/// * `Ok(Some((key, is_add)))` - When a file action is found, returns the key and whether it's an add operation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not move the is_add inside FileActionKey? Even more rusty would be to have a quick:

enum FileActionKeyType {
  Add,
  Remove,
}

and have that be part of the above struct.

If you want to avoid code changes in a refactor that makes sense, maybe just make a follow-up issue?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea 👍 will track with a follow-up issue

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// * `Ok(None)` - When no file action is found
/// * `Err(...)` - On any error during extraction
pub(crate) fn extract_file_action<'a>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did we not have any specific UT on these in the old module? might be worth introducing a handful of quick tests here..

Copy link
Collaborator Author

@sebastiantia sebastiantia Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the original module didn't have specific unit tests for these functions, as they were primarily covered by integration tests at a higher level. Since this PR is focused purely on refactoring for code organization without functional changes, I'd prefer to keep tests separate to maintain a clear scope for reviewers.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure - can we track a follow-up then?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 #780

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @sebastiantia and @zachschuermann, I am new to the open source community. I was told that Rust community is very supportive and helpful. I was wondering if I could try to write these tests. I joined the slack channel but the last activity there was 12 March.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👋 Hi @snackoverflow215! We certainly aim to be supportive and helpful :) - please feel free to add some tests for this, that would be great! I saw you opened #783 already, I'll take a look today

&self,
i: usize,
getters: &[&'a dyn GetData<'a>],
skip_removes: bool,
) -> DeltaResult<Option<(FileActionKey, bool)>> {
// Try to extract an add action by the required path column
if let Some(path) = getters[self.add_path_index].get_str(i, "add.path")? {
let dv_unique_id = self.extract_dv_unique_id(i, getters, self.add_dv_start_index)?;
return Ok(Some((FileActionKey::new(path, dv_unique_id), true)));
}

// The AddRemoveDedupVisitor skips remove actions when extracting file actions from a checkpoint batch.
if skip_removes {
return Ok(None);
}

// Try to extract a remove action by the required path column
if let Some(path) = getters[self.remove_path_index].get_str(i, "remove.path")? {
let dv_unique_id = self.extract_dv_unique_id(i, getters, self.remove_dv_start_index)?;
return Ok(Some((FileActionKey::new(path, dv_unique_id), false)));
}

// No file action found
Ok(None)
}

pub(crate) fn selection_vector(self) -> Vec<bool> {
self.selection_vector
}

pub(crate) fn selection_vector_ref(&self) -> &Vec<bool> {
&self.selection_vector
}

pub(crate) fn selection_vector_mut(&mut self) -> &mut Vec<bool> {
&mut self.selection_vector
}

/// Returns whether we are currently processing a log batch.
pub(crate) fn is_log_batch(&self) -> bool {
self.is_log_batch
}
}
Loading
Loading