Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
127 commits
Select commit Hold shift + click to select a range
435302e
introduce visitors
sebastiantia Mar 12, 2025
e500a10
remove pub
sebastiantia Mar 12, 2025
19733cd
assert! instead of assert_eq with bool
sebastiantia Mar 12, 2025
87c9f31
log replay for checkpoints
sebastiantia Mar 13, 2025
db5ccd0
rename & some clean up
sebastiantia Mar 13, 2025
42c08c1
remove new path for now
sebastiantia Mar 13, 2025
f91baeb
merge non file action visitor tests
sebastiantia Mar 22, 2025
9fdfba7
mvp for refactor
sebastiantia Mar 24, 2025
d420fd1
these github action checks clog my screen
sebastiantia Mar 24, 2025
9e0e048
base file actions struct
sebastiantia Mar 25, 2025
303444b
combine visitors
sebastiantia Mar 25, 2025
5dbc924
fmt
sebastiantia Mar 26, 2025
b793961
remove old code
sebastiantia Mar 26, 2025
508976f
move FileActionKey
sebastiantia Mar 26, 2025
bccaa17
Merge branch 'main' into checkpoint-visitors
sebastiantia Mar 26, 2025
a23d7cb
merge
sebastiantia Mar 26, 2025
0160ef1
fix whitespace
sebastiantia Mar 26, 2025
aae7046
remove old code
sebastiantia Mar 26, 2025
f574370
refactor more
sebastiantia Mar 26, 2025
a618833
refactor
sebastiantia Mar 26, 2025
7da74b2
more docs
sebastiantia Mar 26, 2025
220a216
invert is_log_batch logic
sebastiantia Mar 26, 2025
9d86911
docs
sebastiantia Mar 26, 2025
e5b0e32
docs
sebastiantia Mar 26, 2025
a5393dc
docs and imports
sebastiantia Mar 26, 2025
a23c651
improve mod doc
sebastiantia Mar 27, 2025
d712d18
improve doc
sebastiantia Mar 27, 2025
e564ae1
docs'
sebastiantia Mar 27, 2025
b14ff19
docs
sebastiantia Mar 27, 2025
a52d484
update
sebastiantia Mar 27, 2025
a243a98
nits
sebastiantia Mar 27, 2025
9f06382
Revert "nits"
sebastiantia Mar 28, 2025
58f38c0
nits
sebastiantia Mar 28, 2025
628546c
refactor
sebastiantia Mar 27, 2025
88cf983
move
sebastiantia Mar 27, 2025
10bb7b5
fix rebase
sebastiantia Mar 28, 2025
4b5a3a9
introduce visitors
sebastiantia Mar 12, 2025
1cb9364
assert! instead of assert_eq with bool
sebastiantia Mar 12, 2025
797a05c
merge non file action visitor tests
sebastiantia Mar 22, 2025
45c698d
base file actions struct
sebastiantia Mar 25, 2025
b062125
combine visitors
sebastiantia Mar 25, 2025
90e46cd
fmt
sebastiantia Mar 26, 2025
3c25392
remove old code
sebastiantia Mar 26, 2025
cba8ed6
move FileActionKey
sebastiantia Mar 26, 2025
28f1fb4
fix merge
sebastiantia Mar 27, 2025
48f831a
doc
sebastiantia Mar 27, 2025
7c3d976
docs
sebastiantia Mar 28, 2025
b2bb0ce
fix rebase
sebastiantia Mar 28, 2025
0054c71
merge
sebastiantia Mar 28, 2025
abc7e1f
merge fixes
sebastiantia Mar 28, 2025
964f294
docs
sebastiantia Mar 30, 2025
c026258
clean up and docs
sebastiantia Mar 30, 2025
88ba96c
docs
sebastiantia Mar 30, 2025
4c98c84
docs
sebastiantia Mar 30, 2025
c7cd2d1
Merge branch 'extract-deduplication-logic-from-addRemoveDedupVisitor'…
sebastiantia Mar 30, 2025
542166c
merge
sebastiantia Apr 1, 2025
655ed1d
fix merge
sebastiantia Apr 1, 2025
6c222a3
crate mod
sebastiantia Apr 1, 2025
30bd7d6
dev vis
sebastiantia Apr 1, 2025
159b0dd
merge
sebastiantia Apr 1, 2025
5777e5a
improve docs
sebastiantia Apr 1, 2025
5e6695b
Merge branch 'extract-log-replay-processing-structure' into checkpoin…
sebastiantia Apr 1, 2025
bdbc3fb
docs
sebastiantia Apr 1, 2025
6491113
breaking merge
sebastiantia Apr 1, 2025
95d0164
accept metadata & protocol param
sebastiantia Apr 1, 2025
51104aa
Merge branch 'checkpoint-visitors' into checkpoint-replay
sebastiantia Apr 1, 2025
7a59eab
improve docs
sebastiantia Apr 1, 2025
e4bc34e
docs
sebastiantia Apr 1, 2025
d24a80c
refactor into checkpoint mod
sebastiantia Apr 1, 2025
1981ab4
refactor into test_utils
sebastiantia Apr 1, 2025
f084424
rebase on test-utils refactor
sebastiantia Apr 2, 2025
6a28d99
merge
sebastiantia Apr 2, 2025
3488318
merge
sebastiantia Apr 2, 2025
c4e5522
redundant docs
sebastiantia Apr 2, 2025
18d1a29
fix doc
sebastiantia Apr 2, 2025
92b7296
Merge branch 'main' into checkpoint-visitors
sebastiantia Apr 2, 2025
6167cf2
merge
sebastiantia Apr 2, 2025
0d8b3c0
hoist selection vector and data skipping filter
sebastiantia Apr 3, 2025
43760a5
docs
sebastiantia Apr 3, 2025
1137be6
refactorg
sebastiantia Apr 3, 2025
6e3d722
docs
sebastiantia Apr 3, 2025
2252cec
match simplification
sebastiantia Apr 3, 2025
09f3930
docs
sebastiantia Apr 3, 2025
3efeef6
docs and rename
sebastiantia Apr 4, 2025
63f0294
nits and renames
sebastiantia Apr 4, 2025
fab97ba
rename
sebastiantia Apr 4, 2025
f79d9a5
priv mod
sebastiantia Apr 4, 2025
568b59e
docs
sebastiantia Apr 5, 2025
bce9384
clean up docs
sebastiantia Apr 6, 2025
87b17d4
polish docs
sebastiantia Apr 6, 2025
d8df2ea
notes
sebastiantia Apr 6, 2025
7f49ccd
fix indentation
sebastiantia Apr 6, 2025
e809306
merge
sebastiantia Apr 6, 2025
c9f6edd
bool flags
sebastiantia Apr 6, 2025
3f8a69b
Merge branch 'extract-log-replay-processing-structure' into checkpoin…
sebastiantia Apr 6, 2025
e520d1f
remove atomic counters
sebastiantia Apr 6, 2025
f31e51d
box counters
sebastiantia Apr 6, 2025
79d6ff8
review
sebastiantia Apr 7, 2025
a3cf0f2
revert
sebastiantia Apr 7, 2025
4416968
rc<refcell>
sebastiantia Apr 7, 2025
20fe7fe
unignore
sebastiantia Apr 7, 2025
29489d7
fix docs
sebastiantia Apr 7, 2025
ae22a6b
merge
sebastiantia Apr 7, 2025
5ccde93
oops
sebastiantia Apr 7, 2025
00c834b
docs
sebastiantia Apr 7, 2025
3c11320
clean up doc & test
sebastiantia Apr 7, 2025
4f61757
clean up docs
sebastiantia Apr 7, 2025
fdd4f68
update docs
sebastiantia Apr 7, 2025
f3257b4
Merge branch 'main' into checkpoint-visitors
sebastiantia Apr 7, 2025
9c992fc
merge
sebastiantia Apr 7, 2025
2aec9c3
remove mod docs in this PR
sebastiantia Apr 8, 2025
2e2062f
update docs
sebastiantia Apr 8, 2025
c92ea56
Merge branch 'checkpoint-visitors' into checkpoint-replay
sebastiantia Apr 8, 2025
fcb289d
docs
sebastiantia Apr 8, 2025
4d2029e
docs
sebastiantia Apr 9, 2025
e0d81ab
docs
sebastiantia Apr 9, 2025
e9de5bc
arc
sebastiantia Apr 9, 2025
0b609d5
merge
sebastiantia Apr 10, 2025
ab0a373
docs
sebastiantia Apr 10, 2025
9a9697a
test coverage
sebastiantia Apr 10, 2025
c7630a3
doc
sebastiantia Apr 10, 2025
48a0153
review
sebastiantia Apr 11, 2025
4a1a1dd
schema spec
sebastiantia Apr 11, 2025
4d48a8a
pub crate
sebastiantia Apr 11, 2025
411b2c4
forgot to include this file
sebastiantia Apr 11, 2025
48d529d
review
sebastiantia Apr 14, 2025
6a672d8
Merge branch 'main' into checkpoint-replay
sebastiantia Apr 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 70 additions & 76 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@

use std::collections::{HashMap, HashSet};
use std::sync::LazyLock;

use tracing::debug;

use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
use crate::log_replay::FileActionKey;
use crate::schema::{column_name, ColumnName, ColumnNamesAndTypes, DataType};
use crate::utils::require;
use crate::{DeltaResult, Error};
Expand Down Expand Up @@ -502,27 +504,29 @@ impl RowVisitor for SidecarVisitor {
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) struct CheckpointVisitor<'seen> {
// File actions deduplication state
deduplicator: FileActionDeduplicator<'seen>,
total_file_actions: usize,
total_add_actions: usize,
minimum_file_retention_timestamp: i64,
pub(crate) deduplicator: FileActionDeduplicator<'seen>,
pub(crate) total_file_actions: usize,
pub(crate) total_add_actions: usize,
pub(crate) minimum_file_retention_timestamp: i64,

// Non-file actions deduplication state
seen_protocol: bool,
seen_metadata: bool,
seen_txns: &'seen mut HashSet<String>,
total_non_file_actions: usize,
pub(crate) seen_protocol: bool,
pub(crate) seen_metadata: bool,
pub(crate) seen_txns: &'seen mut HashSet<String>,
pub(crate) total_non_file_actions: usize,
}

#[allow(unused)]
impl CheckpointVisitor<'_> {
/// Create a new CheckpointVisitor
fn new<'seen>(
pub(crate) fn new<'seen>(
seen_file_keys: &'seen mut HashSet<FileActionKey>,
seen_txns: &'seen mut HashSet<String>,
selection_vector: Vec<bool>,
is_log_batch: bool,
minimum_file_retention_timestamp: i64,
seen_protocol: bool,
seen_metadata: bool,
seen_txns: &'seen mut HashSet<String>,
) -> CheckpointVisitor<'seen> {
CheckpointVisitor {
deduplicator: FileActionDeduplicator::new(
Expand All @@ -534,8 +538,8 @@ impl CheckpointVisitor<'_> {
total_add_actions: 0,
minimum_file_retention_timestamp,

seen_protocol: false,
seen_metadata: false,
seen_protocol,
seen_metadata,
seen_txns,
total_non_file_actions: 0,
}
Expand Down Expand Up @@ -703,20 +707,6 @@ impl RowVisitor for CheckpointVisitor<'_> {
}
}

/// The subset of file action fields that uniquely identifies it in the log, used for deduplication
/// of adds and removes during log replay.
#[derive(Debug, Hash, Eq, PartialEq)]
pub(crate) struct FileActionKey {
pub(crate) path: String,
pub(crate) dv_unique_id: Option<String>,
}
impl FileActionKey {
pub(crate) fn new(path: impl Into<String>, dv_unique_id: Option<String>) -> Self {
let path = path.into();
Self { path, dv_unique_id }
}
}

/// This struct contains indices and configuration options needed to
/// extract file actions from action batches in the Delta log.
pub(crate) struct FileActionExtractConfig {
Expand Down Expand Up @@ -923,25 +913,11 @@ pub(crate) fn visit_deletion_vector_at<'a>(

#[cfg(test)]
mod tests {
use std::sync::Arc;

use crate::arrow::array::{RecordBatch, StringArray};
use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use crate::arrow::array::StringArray;
use crate::utils::test_utils::parse_json_batch;
use crate::EngineData;

use super::*;
use crate::{
actions::get_log_schema, engine::arrow_data::ArrowEngineData, engine::sync::SyncEngine,
Engine, EngineData,
};

// TODO(nick): Merge all copies of this into one "test utils" thing
fn string_array_to_engine_data(string_array: StringArray) -> Box<dyn EngineData> {
let string_field = Arc::new(Field::new("a", DataType::Utf8, true));
let schema = Arc::new(ArrowSchema::new(vec![string_field]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(string_array)])
.expect("Can't convert to record batch");
Box::new(ArrowEngineData::new(batch))
}

fn action_batch() -> Box<dyn EngineData> {
let json_strings: StringArray = vec![
Expand All @@ -958,15 +934,6 @@ mod tests {
parse_json_batch(json_strings)
}

fn parse_json_batch(json_strings: StringArray) -> Box<dyn EngineData> {
let engine = SyncEngine::new();
let json_handler = engine.get_json_handler();
let output_schema = get_log_schema().clone();
json_handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap()
}

#[test]
fn test_parse_protocol() -> DeltaResult<()> {
let data = action_batch();
Expand Down Expand Up @@ -1180,10 +1147,12 @@ mod tests {
let mut seen_txns = HashSet::new();
let mut visitor = CheckpointVisitor::new(
&mut seen_file_keys,
&mut seen_txns,
vec![false; 8],
true,
0, // minimum_file_retention_timestamp (no expired tombstones)
0, // minimum_file_retention_timestamp (no expired tombstones)
false, // seen_protocol
false, // seen_metadata
&mut seen_txns,
);

visitor.visit_rows_of(data.as_ref())?;
Expand Down Expand Up @@ -1226,10 +1195,12 @@ mod tests {
let mut seen_txns = HashSet::new();
let mut visitor = CheckpointVisitor::new(
&mut seen_file_keys,
&mut seen_txns,
vec![false; 4],
true,
100, // minimum_file_retention_timestamp (threshold set to 100)
100, // minimum_file_retention_timestamp (threshold set to 100)
false, // seen_protocol
false, // seen_metadata
&mut seen_txns,
);

visitor.visit_rows_of(batch.as_ref())?;
Expand All @@ -1247,16 +1218,23 @@ mod tests {
fn test_checkpoint_visitor_conflicting_file_actions_in_log_batch() -> DeltaResult<()> {
let json_strings: StringArray = vec![
r#"{"add":{"path":"file1","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true}}"#,
// Duplicate path
// Duplicate path
r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"partitionValues":{}}}"#,
]
.into();
let batch = parse_json_batch(json_strings);

let mut seen_file_keys = HashSet::new();
let mut seen_txns = HashSet::new();
let mut visitor =
CheckpointVisitor::new(&mut seen_file_keys, &mut seen_txns, vec![false; 2], true, 0);
let mut visitor = CheckpointVisitor::new(
&mut seen_file_keys,
vec![false; 2],
true,
0, // minimum_file_retention_timestamp
false, // seen_protocol
false, // seen_metadata
&mut seen_txns,
);

visitor.visit_rows_of(batch.as_ref())?;

Expand Down Expand Up @@ -1285,10 +1263,12 @@ mod tests {
let mut seen_txns = HashSet::new();
let mut visitor = CheckpointVisitor::new(
&mut seen_file_keys,
&mut seen_txns,
vec![false; 2],
false, // is_log_batch = false (checkpoint batch)
0,
0, // minimum_file_retention_timestamp
false, // seen_protocol
false, // seen_metadata
&mut seen_txns,
);

visitor.visit_rows_of(batch.as_ref())?;
Expand Down Expand Up @@ -1316,8 +1296,15 @@ mod tests {

let mut seen_file_keys = HashSet::new();
let mut seen_txns = HashSet::new();
let mut visitor =
CheckpointVisitor::new(&mut seen_file_keys, &mut seen_txns, vec![false; 3], true, 0);
let mut visitor = CheckpointVisitor::new(
&mut seen_file_keys,
vec![false; 3],
true, // is_log_batch
0, // minimum_file_retention_timestamp
false, // seen_protocol
false, // seen_metadata
&mut seen_txns,
);

visitor.visit_rows_of(batch.as_ref())?;

Expand All @@ -1341,8 +1328,15 @@ mod tests {

let mut seen_file_keys = HashSet::new();
let mut seen_txns = HashSet::new();
let mut visitor =
CheckpointVisitor::new(&mut seen_file_keys, &mut seen_txns, vec![false; 3], true, 0);
let mut visitor = CheckpointVisitor::new(
&mut seen_file_keys,
vec![false; 3],
true, // is_log_batch
0, // minimum_file_retention_timestamp
false, // seen_protocol
false, // seen_metadata
&mut seen_txns,
);

visitor.visit_rows_of(batch.as_ref())?;

Expand Down Expand Up @@ -1373,16 +1367,14 @@ mod tests {

let mut visitor = CheckpointVisitor::new(
&mut seen_file_keys,
&mut seen_txns, // Pre-populated transaction
vec![false; 3],
true,
0,
true, // is_log_batch
0, // minimum_file_retention_timestamp
true, // seen_protocol - Already seen
true, // seen_metadata - Already seen
&mut seen_txns, // Pre-populated transaction
);

// Mark these as already seen
visitor.seen_protocol = true;
visitor.seen_metadata = true;

visitor.visit_rows_of(batch.as_ref())?;

// All actions should be skipped as they have already been seen
Expand Down Expand Up @@ -1413,10 +1405,12 @@ mod tests {
let mut seen_txns = HashSet::new();
let mut visitor = CheckpointVisitor::new(
&mut seen_file_keys,
&mut seen_txns,
vec![false; 7],
true, // is_log_batch
0, // minimum_file_retention_timestamp
true, // is_log_batch
0, // minimum_file_retention_timestamp
false, // seen_protocol
false, // seen_metadata
&mut seen_txns,
);

visitor.visit_rows_of(batch.as_ref())?;
Expand All @@ -1425,7 +1419,7 @@ mod tests {
let expected = vec![true, false, true, true, false, true, false];
assert_eq!(visitor.deduplicator.selection_vector, expected);
assert_eq!(visitor.seen_txns.len(), 2); // Two different app IDs
assert_eq!(visitor.total_non_file_actions, 4); // 2 txns + 1 protocol + 1 metadata
assert_eq!(visitor.total_non_file_actions, 4);
assert_eq!(visitor.total_file_actions, 0);

Ok(())
Expand Down
Loading
Loading