-
Notifications
You must be signed in to change notification settings - Fork 139
refactor: extract deduplication logic from AddRemoveDedupVisitor into embeddable FileActionsDeduplicator
#769
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
a618833
7da74b2
220a216
9d86911
e5b0e32
a5393dc
a23c651
d712d18
e564ae1
b14ff19
a52d484
a243a98
9f06382
58f38c0
964f294
e982960
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,181 @@ | ||
| //! This module provides log replay utilities. | ||
| //! | ||
| //! Log replay is the process of transforming an iterator of action batches (read from Delta | ||
| //! transaction logs) into an iterator of filtered/transformed actions for specific use cases. | ||
| //! The logs, which record all table changes as JSON entries, are processed batch by batch, | ||
| //! typically from newest to oldest. | ||
| //! | ||
| //! Log replay is currently implemented for table scans, which filter and apply transformations | ||
| //! to produce file actions which builds the view of the table state at a specific point in time. | ||
| //! Future extensions will support additional log replay processors beyond the current use case. | ||
| //! (e.g. checkpointing: filter actions to include only those needed to rebuild table state) | ||
| //! | ||
| //! This module provides structures for efficient batch processing, focusing on file action | ||
| //! deduplication with `FileActionDeduplicator` which tracks unique files across log batches | ||
| //! to minimize memory usage for tables with extensive history. | ||
| use std::collections::HashSet; | ||
|
|
||
| use crate::actions::deletion_vector::DeletionVectorDescriptor; | ||
| use crate::engine_data::{GetData, TypedGetData}; | ||
| use crate::DeltaResult; | ||
|
|
||
| use tracing::debug; | ||
|
|
||
| /// The subset of file action fields that uniquely identifies it in the log, used for deduplication | ||
| /// of adds and removes during log replay. | ||
| #[derive(Debug, Hash, Eq, PartialEq)] | ||
| pub(crate) struct FileActionKey { | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved from |
||
| pub(crate) path: String, | ||
| pub(crate) dv_unique_id: Option<String>, | ||
| } | ||
| impl FileActionKey { | ||
| pub(crate) fn new(path: impl Into<String>, dv_unique_id: Option<String>) -> Self { | ||
| let path = path.into(); | ||
| Self { path, dv_unique_id } | ||
| } | ||
| } | ||
|
|
||
| /// Maintains state and provides functionality for deduplicating file actions during log replay. | ||
| /// | ||
| /// This struct is embedded in visitors to track which files have been seen across multiple | ||
| /// log batches. Since logs are processed newest-to-oldest, this deduplicator ensures that each | ||
sebastiantia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| /// unique file (identified by path and deletion vector ID) is processed only once. Performing | ||
| /// deduplication at the visitor level avoids having to load all actions into memory at once, | ||
| /// significantly reducing memory usage for large Delta tables with extensive history. | ||
| pub(crate) struct FileActionDeduplicator<'seen> { | ||
| /// A set of (data file path, dv_unique_id) pairs that have been seen thus | ||
| /// far in the log for deduplication. This is a mutable reference to the set | ||
| /// of seen file keys that persists across multiple log batches. | ||
| seen_file_keys: &'seen mut HashSet<FileActionKey>, | ||
| /// Whether we're processing a log batch (as opposed to a checkpoint) | ||
| is_log_batch: bool, | ||
sebastiantia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| /// Index of the getter containing the add.path column | ||
| add_path_index: usize, | ||
| /// Index of the getter containing the remove.path column | ||
| remove_path_index: usize, | ||
| /// Starting index for add action deletion vector columns | ||
| add_dv_start_index: usize, | ||
| /// Starting index for remove action deletion vector columns | ||
| remove_dv_start_index: usize, | ||
| } | ||
|
|
||
| impl<'seen> FileActionDeduplicator<'seen> { | ||
| pub(crate) fn new( | ||
| seen_file_keys: &'seen mut HashSet<FileActionKey>, | ||
| is_log_batch: bool, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thinking out loud: we could actually make this something like "deduplicate_batch: bool" or "save_batch: bool" ? baking in less checkpoint/commit specific logic and instead just communicating whether or not we should save the file actions in the hashmap for future deduplication (i.e. we do this in commit batches but not checkpoint batches) let's not solve that here, perhaps talk about little rename in the future
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Left a todo... |
||
| add_path_index: usize, | ||
| remove_path_index: usize, | ||
| add_dv_start_index: usize, | ||
| remove_dv_start_index: usize, | ||
| ) -> Self { | ||
| Self { | ||
| seen_file_keys, | ||
| is_log_batch, | ||
| add_path_index, | ||
| remove_path_index, | ||
| add_dv_start_index, | ||
| remove_dv_start_index, | ||
| } | ||
| } | ||
|
|
||
| /// Checks if log replay already processed this logical file (in which case the current action | ||
| /// should be ignored). If not already seen, register it so we can recognize future duplicates. | ||
| /// Returns `true` if we have seen the file and should ignore it, `false` if we have not seen it | ||
| /// and should process it. | ||
| pub(crate) fn check_and_record_seen(&mut self, key: FileActionKey) -> bool { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we call out what is just a simple move and what is a material change? AFAICT this is code that already existed?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Directly extracted from |
||
| // Note: each (add.path + add.dv_unique_id()) pair has a | ||
| // unique Add + Remove pair in the log. For example: | ||
| // https://github.com/delta-io/delta/blob/master/spark/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000001.json | ||
|
|
||
| if self.seen_file_keys.contains(&key) { | ||
| debug!( | ||
| "Ignoring duplicate ({}, {:?}) in scan, is log {}", | ||
| key.path, key.dv_unique_id, self.is_log_batch | ||
| ); | ||
| true | ||
| } else { | ||
| debug!( | ||
| "Including ({}, {:?}) in scan, is log {}", | ||
| key.path, key.dv_unique_id, self.is_log_batch | ||
| ); | ||
| if self.is_log_batch { | ||
| // Remember file actions from this batch so we can ignore duplicates as we process | ||
| // batches from older commit and/or checkpoint files. We don't track checkpoint | ||
| // batches because they are already the oldest actions and never replace anything. | ||
| self.seen_file_keys.insert(key); | ||
| } | ||
| false | ||
| } | ||
| } | ||
|
|
||
| /// Extract the deletion vector unique ID if it exists. | ||
| fn extract_dv_unique_id<'a>( | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An internal refactor, only called from
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we document some of the requirements here (e.g. you give a dv_start_index and then it does uses getters from that index and + 1 and + 2 for extracting the storage type, dv, and offset fields.) |
||
| &self, | ||
| i: usize, | ||
| getters: &[&'a dyn GetData<'a>], | ||
| dv_start_index: usize, | ||
| ) -> DeltaResult<Option<String>> { | ||
| match getters[dv_start_index].get_opt(i, "deletionVector.storageType")? { | ||
| Some(storage_type) => { | ||
| let path_or_inline = | ||
| getters[dv_start_index + 1].get(i, "deletionVector.pathOrInlineDv")?; | ||
| let offset = getters[dv_start_index + 2].get_opt(i, "deletionVector.offset")?; | ||
|
|
||
| Ok(Some(DeletionVectorDescriptor::unique_id_from_parts( | ||
| storage_type, | ||
| path_or_inline, | ||
| offset, | ||
| ))) | ||
| } | ||
| None => Ok(None), | ||
| } | ||
| } | ||
|
|
||
| /// Extracts a file action key and determines if it's an add operation. | ||
| /// This method examines the data at the given index using the provided getters | ||
| /// to identify whether a file action exists and what type it is. | ||
| /// | ||
| /// # Arguments | ||
| /// | ||
| /// * `i` - Index position in the data structure to examine | ||
| /// * `getters` - Collection of data getter implementations used to access the data | ||
| /// * `skip_removes` - Whether to skip remove actions when extracting file actions | ||
| /// | ||
| /// # Returns | ||
| /// | ||
| /// * `Ok(Some((key, is_add)))` - When a file action is found, returns the key and whether it's an add operation | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not move the enum FileActionKeyType {
Add,
Remove,
}and have that be part of the above struct. If you want to avoid code changes in a refactor that makes sense, maybe just make a follow-up issue?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea 👍 will track with a follow-up issue
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| /// * `Ok(None)` - When no file action is found | ||
| /// * `Err(...)` - On any error during extraction | ||
| pub(crate) fn extract_file_action<'a>( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. did we not have any specific UT on these in the old module? might be worth introducing a handful of quick tests here..
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the original module didn't have specific unit tests for these functions, as they were primarily covered by integration tests at a higher level. Since this PR is focused purely on refactoring for code organization without functional changes, I'd prefer to keep tests separate to maintain a clear scope for reviewers.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure - can we track a follow-up then?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 #780 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hello @sebastiantia and @zachschuermann, I am new to the open source community. I was told that Rust community is very supportive and helpful. I was wondering if I could try to write these tests. I joined the slack channel but the last activity there was 12 March.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👋 Hi @snackoverflow215! We certainly aim to be supportive and helpful :) - please feel free to add some tests for this, that would be great! I saw you opened #783 already, I'll take a look today |
||
| &self, | ||
| i: usize, | ||
| getters: &[&'a dyn GetData<'a>], | ||
| skip_removes: bool, | ||
| ) -> DeltaResult<Option<(FileActionKey, bool)>> { | ||
| // Try to extract an add action by the required path column | ||
| if let Some(path) = getters[self.add_path_index].get_str(i, "add.path")? { | ||
| let dv_unique_id = self.extract_dv_unique_id(i, getters, self.add_dv_start_index)?; | ||
| return Ok(Some((FileActionKey::new(path, dv_unique_id), true))); | ||
| } | ||
|
|
||
| // The AddRemoveDedupVisitor skips remove actions when extracting file actions from a checkpoint batch. | ||
| if skip_removes { | ||
| return Ok(None); | ||
| } | ||
|
|
||
| // Try to extract a remove action by the required path column | ||
| if let Some(path) = getters[self.remove_path_index].get_str(i, "remove.path")? { | ||
| let dv_unique_id = self.extract_dv_unique_id(i, getters, self.remove_dv_start_index)?; | ||
| return Ok(Some((FileActionKey::new(path, dv_unique_id), false))); | ||
| } | ||
|
|
||
| // No file action found | ||
| Ok(None) | ||
| } | ||
|
|
||
| /// Returns whether we are currently processing a log batch. | ||
sebastiantia marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| pub(crate) fn is_log_batch(&self) -> bool { | ||
| self.is_log_batch | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just noticing (sorry this snuck through somehow) this shouldn't be
pub