Skip to content

Commit 8cf054b

Browse files
committed
fix merge
1 parent a753cb0 commit 8cf054b

File tree

2 files changed

+22
-209
lines changed

2 files changed

+22
-209
lines changed

kernel/src/actions/visitors.rs

+22-208
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
44
use std::collections::{HashMap, HashSet};
55
use std::sync::LazyLock;
6-
use tracing::debug;
76

87
use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
8+
use crate::log_replay::{FileActionDeduplicator, FileActionKey};
99
use crate::schema::{column_name, ColumnName, ColumnNamesAndTypes, DataType};
1010
use crate::utils::require;
1111
use crate::{DeltaResult, Error};
@@ -516,6 +516,12 @@ pub(crate) struct CheckpointVisitor<'seen> {
516516

517517
#[allow(unused)]
518518
impl CheckpointVisitor<'_> {
519+
// The index position in the row getters for the following columns
520+
const ADD_PATH_INDEX: usize = 0;
521+
const ADD_DV_START_INDEX: usize = 1;
522+
const REMOVE_PATH_INDEX: usize = 4;
523+
const REMOVE_DV_START_INDEX: usize = 6;
524+
519525
/// Create a new CheckpointVisitor
520526
fn new<'seen>(
521527
seen_file_keys: &'seen mut HashSet<FileActionKey>,
@@ -529,6 +535,10 @@ impl CheckpointVisitor<'_> {
529535
seen_file_keys,
530536
selection_vector,
531537
is_log_batch,
538+
Self::ADD_PATH_INDEX,
539+
Self::REMOVE_PATH_INDEX,
540+
Self::ADD_DV_START_INDEX,
541+
Self::REMOVE_DV_START_INDEX,
532542
),
533543
total_file_actions: 0,
534544
total_add_actions: 0,
@@ -563,17 +573,13 @@ impl CheckpointVisitor<'_> {
563573
i: usize,
564574
getters: &[&'a dyn GetData<'a>],
565575
) -> DeltaResult<bool> {
566-
// Extract file action key and determine if it's an add operation
567-
let Some((file_key, is_add)) = self.deduplicator.extract_file_action(
568-
i,
569-
getters,
570-
// Do not skip remove actions (even if we're processing a log batch)
571-
FileActionExtractConfig::new(0, 4, 1, 6, false),
572-
)?
576+
// Never skip remove actions, as they may be unexpired tombstones.
577+
let Some((file_key, is_add)) = self.deduplicator.extract_file_action(i, getters, false)?
573578
else {
574579
return Ok(false);
575580
};
576581

582+
// Check if we've already seen this file action
577583
if self.deduplicator.check_and_record_seen(file_key) {
578584
return Ok(false);
579585
}
@@ -703,198 +709,6 @@ impl RowVisitor for CheckpointVisitor<'_> {
703709
}
704710
}
705711

706-
/// The subset of file action fields that uniquely identifies it in the log, used for deduplication
707-
/// of adds and removes during log replay.
708-
#[derive(Debug, Hash, Eq, PartialEq)]
709-
pub(crate) struct FileActionKey {
710-
pub(crate) path: String,
711-
pub(crate) dv_unique_id: Option<String>,
712-
}
713-
impl FileActionKey {
714-
pub(crate) fn new(path: impl Into<String>, dv_unique_id: Option<String>) -> Self {
715-
let path = path.into();
716-
Self { path, dv_unique_id }
717-
}
718-
}
719-
720-
/// This struct contains indices and configuration options needed to
721-
/// extract file actions from action batches in the Delta log.
722-
pub(crate) struct FileActionExtractConfig {
723-
/// Index of the getter containing the add.path column
724-
pub add_path_index: usize,
725-
/// Index of the getter containing the remove.path column
726-
pub remove_path_index: usize,
727-
/// Starting index for add action deletion vector columns
728-
pub add_dv_start_index: usize,
729-
/// Starting index for remove action deletion vector columns
730-
pub remove_dv_start_index: usize,
731-
/// Whether to skip remove actions when extracting file actions
732-
pub skip_removes: bool,
733-
}
734-
735-
impl FileActionExtractConfig {
736-
pub(crate) fn new(
737-
add_path_index: usize,
738-
remove_path_index: usize,
739-
add_dv_start_index: usize,
740-
remove_dv_start_index: usize,
741-
skip_removes: bool,
742-
) -> Self {
743-
Self {
744-
add_path_index,
745-
remove_path_index,
746-
add_dv_start_index,
747-
remove_dv_start_index,
748-
skip_removes,
749-
}
750-
}
751-
}
752-
753-
/// Core implementation for deduplicating file actions in Delta log replay
754-
/// This struct extracts the common functionality from the CheckpointVisitor
755-
/// and the AddRemoveDedupVisitor.
756-
pub(crate) struct FileActionDeduplicator<'seen> {
757-
/// A set of (data file path, dv_unique_id) pairs that have been seen thus
758-
/// far in the log for deduplication
759-
seen_file_keys: &'seen mut HashSet<FileActionKey>,
760-
/// Selection vector to track which rows should be included
761-
selection_vector: Vec<bool>,
762-
/// Whether we're processing a log batch (as opposed to a checkpoint)
763-
is_log_batch: bool,
764-
}
765-
766-
impl<'seen> FileActionDeduplicator<'seen> {
767-
pub(crate) fn new(
768-
seen_file_keys: &'seen mut HashSet<FileActionKey>,
769-
selection_vector: Vec<bool>,
770-
is_log_batch: bool,
771-
) -> Self {
772-
Self {
773-
seen_file_keys,
774-
selection_vector,
775-
is_log_batch,
776-
}
777-
}
778-
779-
/// Checks if log replay already processed this logical file (in which case the current action
780-
/// should be ignored). If not already seen, register it so we can recognize future duplicates.
781-
/// Returns `true` if we have seen the file and should ignore it, `false` if we have not seen it
782-
/// and should process it.
783-
pub(crate) fn check_and_record_seen(&mut self, key: FileActionKey) -> bool {
784-
// Note: each (add.path + add.dv_unique_id()) pair has a
785-
// unique Add + Remove pair in the log. For example:
786-
// https://github.com/delta-io/delta/blob/master/spark/src/test/resources/delta/table-with-dv-large/_delta_log/00000000000000000001.json
787-
788-
if self.seen_file_keys.contains(&key) {
789-
debug!(
790-
"Ignoring duplicate ({}, {:?}) in scan, is log {}",
791-
key.path, key.dv_unique_id, self.is_log_batch
792-
);
793-
true
794-
} else {
795-
debug!(
796-
"Including ({}, {:?}) in scan, is log {}",
797-
key.path, key.dv_unique_id, self.is_log_batch
798-
);
799-
if self.is_log_batch {
800-
// Remember file actions from this batch so we can ignore duplicates as we process
801-
// batches from older commit and/or checkpoint files. We don't track checkpoint
802-
// batches because they are already the oldest actions and never replace anything.
803-
self.seen_file_keys.insert(key);
804-
}
805-
false
806-
}
807-
}
808-
809-
/// Extract deletion vector unique ID
810-
fn extract_dv_unique_id<'a>(
811-
&self,
812-
i: usize,
813-
getters: &[&'a dyn GetData<'a>],
814-
add_dv_start_index: Option<usize>,
815-
remove_dv_start_index: Option<usize>,
816-
) -> DeltaResult<Option<String>> {
817-
// Get the starting index based on action type
818-
let start_idx = add_dv_start_index
819-
.or(remove_dv_start_index)
820-
.ok_or_else(|| Error::GenericError {
821-
source: "starting indices for add/remove DVs should have been passed".into(),
822-
})?;
823-
824-
// Extract the DV unique ID
825-
match getters[start_idx].get_opt(i, "deletionVector.storageType")? {
826-
Some(storage_type) => Ok(Some(DeletionVectorDescriptor::unique_id_from_parts(
827-
storage_type,
828-
getters[start_idx + 1].get(i, "deletionVector.pathOrInlineDv")?,
829-
getters[start_idx + 2].get_opt(i, "deletionVector.offset")?,
830-
))),
831-
None => Ok(None),
832-
}
833-
}
834-
835-
/// Extracts a file action key and determines if it's an add operation.
836-
///
837-
/// This method examines the data at the given index using the provided getters and config
838-
/// to identify whether a file action exists and what type it is.
839-
///
840-
/// # Arguments
841-
///
842-
/// * `i` - Index position in the data structure to examine
843-
/// * `getters` - Collection of data getter implementations used to access the data
844-
/// * `config` - Configuration specifying where to find add/remove operations
845-
///
846-
/// # Returns
847-
///
848-
/// * `Ok(Some((key, is_add)))` - When a file action is found, returns the key and whether it's an add operation
849-
/// * `Ok(None)` - When no file action is found
850-
/// * `Err(...)` - On any error during extraction
851-
pub(crate) fn extract_file_action<'a>(
852-
&self,
853-
i: usize,
854-
getters: &[&'a dyn GetData<'a>],
855-
config: FileActionExtractConfig,
856-
) -> DeltaResult<Option<(FileActionKey, bool)>> {
857-
// Try to extract an add action path
858-
if let Some(path) = getters[config.add_path_index].get_str(i, "add.path")? {
859-
let dv_unique_id =
860-
self.extract_dv_unique_id(i, getters, Some(config.add_dv_start_index), None)?;
861-
return Ok(Some((FileActionKey::new(path, dv_unique_id), true)));
862-
}
863-
864-
// The AddRemoveDedupVisitor skips remove actions when extracting file actions from a checkpoint file.
865-
if config.skip_removes {
866-
return Ok(None);
867-
}
868-
869-
// Try to extract a remove action path
870-
if let Some(path) = getters[config.remove_path_index].get_str(i, "remove.path")? {
871-
let dv_unique_id =
872-
self.extract_dv_unique_id(i, getters, None, Some(config.remove_dv_start_index))?;
873-
return Ok(Some((FileActionKey::new(path, dv_unique_id), false)));
874-
}
875-
876-
// No file action found
877-
Ok(None)
878-
}
879-
880-
pub(crate) fn selection_vector(self) -> Vec<bool> {
881-
self.selection_vector
882-
}
883-
884-
pub(crate) fn selection_vector_ref(&self) -> &Vec<bool> {
885-
&self.selection_vector
886-
}
887-
888-
pub(crate) fn selection_vector_mut(&mut self) -> &mut Vec<bool> {
889-
&mut self.selection_vector
890-
}
891-
892-
/// Returns whether we are currently processing a log batch.
893-
pub(crate) fn is_log_batch(&self) -> bool {
894-
self.is_log_batch
895-
}
896-
}
897-
898712
/// Get a DV out of some engine data. The caller is responsible for slicing the `getters` slice such
899713
/// that the first element contains the `storageType` element of the deletion vector.
900714
pub(crate) fn visit_deletion_vector_at<'a>(
@@ -1206,7 +1020,7 @@ mod tests {
12061020
assert_eq!(visitor.seen_txns.len(), 1);
12071021
assert_eq!(visitor.total_non_file_actions, 3);
12081022

1209-
assert_eq!(visitor.deduplicator.selection_vector, expected);
1023+
assert_eq!(visitor.deduplicator.selection_vector(), expected);
12101024
Ok(())
12111025
}
12121026

@@ -1236,7 +1050,7 @@ mod tests {
12361050

12371051
// Only "one_above_threshold" should be kept
12381052
let expected = vec![false, false, true, false];
1239-
assert_eq!(visitor.deduplicator.selection_vector, expected);
1053+
assert_eq!(visitor.deduplicator.selection_vector(), expected);
12401054
assert_eq!(visitor.total_file_actions, 1);
12411055
assert_eq!(visitor.total_add_actions, 0);
12421056
assert_eq!(visitor.total_non_file_actions, 0);
@@ -1262,7 +1076,7 @@ mod tests {
12621076

12631077
// First one should be included, second one skipped as a duplicate
12641078
let expected = vec![true, false];
1265-
assert_eq!(visitor.deduplicator.selection_vector, expected);
1079+
assert_eq!(visitor.deduplicator.selection_vector(), expected);
12661080
assert_eq!(visitor.total_file_actions, 1);
12671081
assert_eq!(visitor.total_add_actions, 1);
12681082
assert_eq!(visitor.total_non_file_actions, 0);
@@ -1295,7 +1109,7 @@ mod tests {
12951109

12961110
// Both should be included since we don't track duplicates in checkpoint batches
12971111
let expected = vec![true, true];
1298-
assert_eq!(visitor.deduplicator.selection_vector, expected);
1112+
assert_eq!(visitor.deduplicator.selection_vector(), expected);
12991113
assert_eq!(visitor.total_file_actions, 2);
13001114
assert_eq!(visitor.total_add_actions, 2);
13011115
assert_eq!(visitor.total_non_file_actions, 0);
@@ -1322,7 +1136,7 @@ mod tests {
13221136
visitor.visit_rows_of(batch.as_ref())?;
13231137

13241138
let expected = vec![true, true, false]; // Third one is a duplicate
1325-
assert_eq!(visitor.deduplicator.selection_vector, expected);
1139+
assert_eq!(visitor.deduplicator.selection_vector(), expected);
13261140
assert_eq!(visitor.total_file_actions, 2);
13271141
assert_eq!(visitor.total_add_actions, 2);
13281142
assert_eq!(visitor.total_non_file_actions, 0);
@@ -1347,7 +1161,7 @@ mod tests {
13471161
visitor.visit_rows_of(batch.as_ref())?;
13481162

13491163
let expected = vec![true, true, true];
1350-
assert_eq!(visitor.deduplicator.selection_vector, expected);
1164+
assert_eq!(visitor.deduplicator.selection_vector(), expected);
13511165
assert!(visitor.seen_protocol);
13521166
assert!(visitor.seen_metadata);
13531167
assert_eq!(visitor.seen_txns.len(), 1);
@@ -1387,7 +1201,7 @@ mod tests {
13871201

13881202
// All actions should be skipped as they have already been seen
13891203
let expected = vec![false, false, false];
1390-
assert_eq!(visitor.deduplicator.selection_vector, expected);
1204+
assert_eq!(visitor.deduplicator.selection_vector(), expected);
13911205
assert_eq!(visitor.total_non_file_actions, 0);
13921206
assert_eq!(visitor.total_file_actions, 0);
13931207

@@ -1423,7 +1237,7 @@ mod tests {
14231237

14241238
// First occurrence of each type should be included
14251239
let expected = vec![true, false, true, true, false, true, false];
1426-
assert_eq!(visitor.deduplicator.selection_vector, expected);
1240+
assert_eq!(visitor.deduplicator.selection_vector(), expected);
14271241
assert_eq!(visitor.seen_txns.len(), 2); // Two different app IDs
14281242
assert_eq!(visitor.total_non_file_actions, 4); // 2 txns + 1 protocol + 1 metadata
14291243
assert_eq!(visitor.total_file_actions, 0);

kernel/src/scan/log_replay.rs

-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use itertools::Itertools;
77
use super::data_skipping::DataSkippingFilter;
88
use super::{ScanData, Transform};
99
use crate::actions::get_log_add_schema;
10-
use crate::actions::visitors::{FileActionDeduplicator, FileActionExtractConfig, FileActionKey};
1110
use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
1211
use crate::expressions::{column_expr, column_name, ColumnName, Expression, ExpressionRef};
1312
use crate::log_replay::{FileActionDeduplicator, FileActionKey};

0 commit comments

Comments
 (0)