Skip to content

Commit 06fb373

Browse files
authored
refactor: extract deduplication logic from AddRemoveDedupVisitor into embeddable FileActionsDeduplicator (#769)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-incubator/delta-kernel-rs/blob/main/CONTRIBUTING.md 2. Run `cargo t --all-features --all-targets` to get started testing, and run `cargo fmt`. 3. Ensure you have added or run the appropriate tests for your PR. 4. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 5. Be sure to keep the PR description updated to reflect all changes. --> <!-- PR title formatting: This project uses conventional commits: https://www.conventionalcommits.org/ Each PR corresponds to a commit on the `main` branch, with the title of the PR (typically) being used for the commit message on main. In order to ensure proper formatting in the CHANGELOG please ensure your PR title adheres to the conventional commit specification. Examples: - new feature PR: "feat: new API for snapshot.update()" - bugfix PR: "fix: correctly apply DV in read-table example" --> ## What changes are proposed in this pull request? <!-- Please clarify what changes you are proposing and why the changes are needed. The purpose of this section is to outline the changes, why they are needed, and how this PR fixes the issue. If the reason for the change is already explained clearly in an issue, then it does not need to be restated here. 1. If you propose a new API or feature, clarify the use case for a new API or feature. 2. If you fix a bug, you can clarify why it is a bug. --> **No behavioral changes were introduced, this is purely a refactoring effort** This PR extracts the core deduplication logic from the `AddRemoveDedupVisitor` in order to be shared with the incoming `CheckpointVisitor`. For a bigger picture view on how this refactor is helpful, please take a look at the following PR which implements the `CheckpointVisitor` with an embedded `FileActionsDeduplicator` that will rebase this PR once merged. [[link to PR]](#738). This `FileActionsDeduplicator` lives in the new top-level `log_replay` mod as it will be leveraged in the nested `scan/log_replay` mod and the incoming `checkpoints/log_replay` mod. There are also additional traits & structs that the two `log_replay` implementations will share via this new top-level mod. For an even wider view of the implementation of the `checkpoints` mod and the component re-use, please have a look at the following PR. [[link to PR]](#744) ## Summary of refactor 1. New `log_replay` mod 2. Moved `FileActionKey` definition from `scan/log_replay` to the new `log_replay` mod 3. New `FileActionDeduplicator` in the new `log_replay` mod - Includes the `check_and_record_seen` method which was simply **moved** from the `AddRemoveDedupVisitor` - Includes the `extract_file_action` method and `extract_dv_unique_id` private method which may be new concepts, but includes functionality which are both pieces of functionality pulled from the `AddRemoveDedupVisitor` to be shared with the incoming `CheckpointVisitor` <!-- Uncomment this section if there are any changes affecting public APIs: ### This PR affects the following public APIs If there are breaking changes, please ensure the `breaking-changes` label gets added by CI, and describe why the changes are needed. Note that _new_ public APIs are not considered breaking. --> ## How was this change tested? <!-- Please make sure to add test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested, ideally via a reproducible test documented in the PR description. --> All existing tests pass ✅
1 parent 9290930 commit 06fb373

File tree

3 files changed

+252
-76
lines changed

3 files changed

+252
-76
lines changed

kernel/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ pub mod actions;
7777
pub mod engine_data;
7878
pub mod error;
7979
pub mod expressions;
80+
pub mod log_replay;
8081
pub mod scan;
8182
pub mod schema;
8283
pub mod snapshot;

kernel/src/log_replay.rs

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
//! This module provides log replay utilities.
2+
//!
3+
//! Log replay is the process of transforming an iterator of action batches (read from Delta
4+
//! transaction logs) into an iterator of filtered/transformed actions for specific use cases.
5+
//! The logs, which record all table changes as JSON entries, are processed batch by batch,
6+
//! typically from newest to oldest.
7+
//!
8+
//! Log replay is currently implemented for table scans, which filter and apply transformations
9+
//! to produce file actions which builds the view of the table state at a specific point in time.
10+
//! Future extensions will support additional log replay processors beyond the current use case.
11+
//! (e.g. checkpointing: filter actions to include only those needed to rebuild table state)
12+
//!
13+
//! This module provides structures for efficient batch processing, focusing on file action
14+
//! deduplication with `FileActionDeduplicator` which tracks unique files across log batches
15+
//! to minimize memory usage for tables with extensive history.
16+
17+
use std::collections::HashSet;
18+
19+
use crate::actions::deletion_vector::DeletionVectorDescriptor;
20+
use crate::engine_data::{GetData, TypedGetData};
21+
use crate::DeltaResult;
22+
23+
use tracing::debug;
24+
25+
/// The subset of file action fields that uniquely identifies it in the log, used for deduplication
26+
/// of adds and removes during log replay.
27+
#[derive(Debug, Hash, Eq, PartialEq)]
28+
pub(crate) struct FileActionKey {
29+
pub(crate) path: String,
30+
pub(crate) dv_unique_id: Option<String>,
31+
}
32+
impl FileActionKey {
33+
pub(crate) fn new(path: impl Into<String>, dv_unique_id: Option<String>) -> Self {
34+
let path = path.into();
35+
Self { path, dv_unique_id }
36+
}
37+
}
38+
39+
/// Maintains state and provides functionality for deduplicating file actions during log replay.
40+
///
41+
/// This struct is embedded in visitors to track which files have been seen across multiple
42+
/// log batches. Since logs are processed newest-to-oldest, this deduplicator ensures that each
43+
/// unique file (identified by path and deletion vector ID) is processed only once. Performing
44+
/// deduplication at the visitor level avoids having to load all actions into memory at once,
45+
/// significantly reducing memory usage for large Delta tables with extensive history.
46+
pub(crate) struct FileActionDeduplicator<'seen> {
47+
/// A set of (data file path, dv_unique_id) pairs that have been seen thus
48+
/// far in the log for deduplication. This is a mutable reference to the set
49+
/// of seen file keys that persists across multiple log batches.
50+
seen_file_keys: &'seen mut HashSet<FileActionKey>,
51+
// TODO: Consider renaming to `is_commit_batch`, `deduplicate_batch`, or `save_batch`
52+
// to better reflect its role in deduplication logic.
53+
/// Whether we're processing a log batch (as opposed to a checkpoint)
54+
is_log_batch: bool,
55+
/// Index of the getter containing the add.path column
56+
add_path_index: usize,
57+
/// Index of the getter containing the remove.path column
58+
remove_path_index: usize,
59+
/// Starting index for add action deletion vector columns
60+
add_dv_start_index: usize,
61+
/// Starting index for remove action deletion vector columns
62+
remove_dv_start_index: usize,
63+
}
64+
65+
impl<'seen> FileActionDeduplicator<'seen> {
66+
pub(crate) fn new(
67+
seen_file_keys: &'seen mut HashSet<FileActionKey>,
68+
is_log_batch: bool,
69+
add_path_index: usize,
70+
remove_path_index: usize,
71+
add_dv_start_index: usize,
72+
remove_dv_start_index: usize,
73+
) -> Self {
74+
Self {
75+
seen_file_keys,
76+
is_log_batch,
77+
add_path_index,
78+
remove_path_index,
79+
add_dv_start_index,
80+
remove_dv_start_index,
81+
}
82+
}
83+
84+
/// Checks if log replay already processed this logical file (in which case the current action
85+
/// should be ignored). If not already seen, register it so we can recognize future duplicates.
86+
/// Returns `true` if we have seen the file and should ignore it, `false` if we have not seen it
87+
/// and should process it.
88+
pub(crate) fn check_and_record_seen(&mut self, key: FileActionKey) -> bool {
89+
// Note: each (add.path + add.dv_unique_id()) pair has a
90+
// unique Add + Remove pair in the log. For example:
91+
// https://github.com/delta-io/delta/blob/master/spark/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000001.json
92+
93+
if self.seen_file_keys.contains(&key) {
94+
debug!(
95+
"Ignoring duplicate ({}, {:?}) in scan, is log {}",
96+
key.path, key.dv_unique_id, self.is_log_batch
97+
);
98+
true
99+
} else {
100+
debug!(
101+
"Including ({}, {:?}) in scan, is log {}",
102+
key.path, key.dv_unique_id, self.is_log_batch
103+
);
104+
if self.is_log_batch {
105+
// Remember file actions from this batch so we can ignore duplicates as we process
106+
// batches from older commit and/or checkpoint files. We don't track checkpoint
107+
// batches because they are already the oldest actions and never replace anything.
108+
self.seen_file_keys.insert(key);
109+
}
110+
false
111+
}
112+
}
113+
114+
/// Extracts the deletion vector unique ID if it exists.
115+
///
116+
/// This function retrieves the necessary fields for constructing a deletion vector unique ID
117+
/// by accessing `getters` at `dv_start_index` and the following two indices. Specifically:
118+
/// - `dv_start_index` retrieves the storage type (`deletionVector.storageType`).
119+
/// - `dv_start_index + 1` retrieves the path or inline deletion vector (`deletionVector.pathOrInlineDv`).
120+
/// - `dv_start_index + 2` retrieves the optional offset (`deletionVector.offset`).
121+
fn extract_dv_unique_id<'a>(
122+
&self,
123+
i: usize,
124+
getters: &[&'a dyn GetData<'a>],
125+
dv_start_index: usize,
126+
) -> DeltaResult<Option<String>> {
127+
match getters[dv_start_index].get_opt(i, "deletionVector.storageType")? {
128+
Some(storage_type) => {
129+
let path_or_inline =
130+
getters[dv_start_index + 1].get(i, "deletionVector.pathOrInlineDv")?;
131+
let offset = getters[dv_start_index + 2].get_opt(i, "deletionVector.offset")?;
132+
133+
Ok(Some(DeletionVectorDescriptor::unique_id_from_parts(
134+
storage_type,
135+
path_or_inline,
136+
offset,
137+
)))
138+
}
139+
None => Ok(None),
140+
}
141+
}
142+
143+
/// Extracts a file action key and determines if it's an add operation.
144+
/// This method examines the data at the given index using the provided getters
145+
/// to identify whether a file action exists and what type it is.
146+
///
147+
/// # Arguments
148+
///
149+
/// * `i` - Index position in the data structure to examine
150+
/// * `getters` - Collection of data getter implementations used to access the data
151+
/// * `skip_removes` - Whether to skip remove actions when extracting file actions
152+
///
153+
/// # Returns
154+
///
155+
/// * `Ok(Some((key, is_add)))` - When a file action is found, returns the key and whether it's an add operation
156+
/// * `Ok(None)` - When no file action is found
157+
/// * `Err(...)` - On any error during extraction
158+
pub(crate) fn extract_file_action<'a>(
159+
&self,
160+
i: usize,
161+
getters: &[&'a dyn GetData<'a>],
162+
skip_removes: bool,
163+
) -> DeltaResult<Option<(FileActionKey, bool)>> {
164+
// Try to extract an add action by the required path column
165+
if let Some(path) = getters[self.add_path_index].get_str(i, "add.path")? {
166+
let dv_unique_id = self.extract_dv_unique_id(i, getters, self.add_dv_start_index)?;
167+
return Ok(Some((FileActionKey::new(path, dv_unique_id), true)));
168+
}
169+
170+
// The AddRemoveDedupVisitor skips remove actions when extracting file actions from a checkpoint batch.
171+
if skip_removes {
172+
return Ok(None);
173+
}
174+
175+
// Try to extract a remove action by the required path column
176+
if let Some(path) = getters[self.remove_path_index].get_str(i, "remove.path")? {
177+
let dv_unique_id = self.extract_dv_unique_id(i, getters, self.remove_dv_start_index)?;
178+
return Ok(Some((FileActionKey::new(path, dv_unique_id), false)));
179+
}
180+
181+
// No file action found
182+
Ok(None)
183+
}
184+
185+
/// Returns whether we are currently processing a log batch.
186+
///
187+
/// `true` indicates we are processing a batch from a commit file.
188+
/// `false` indicates we are processing a batch from a checkpoint.
189+
pub(crate) fn is_log_batch(&self) -> bool {
190+
self.is_log_batch
191+
}
192+
}

0 commit comments

Comments
 (0)