Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 20 additions & 27 deletions kernel/src/checkpoint/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl LogReplayProcessor for CheckpointLogReplayProcessor {

Ok(CheckpointBatch {
filtered_data,
actions_count: visitor.non_file_actions_count + visitor.file_actions_count,
actions_count: visitor.actions_count,
add_actions_count: visitor.add_actions_count,
})
}
Expand Down Expand Up @@ -193,9 +193,7 @@ pub(crate) struct CheckpointVisitor<'seen> {
// TODO: _last_checkpoint schema should be updated to use u64 instead of i64
// for fields that are not expected to be negative. (Issue #786)
// i64 to match the `_last_checkpoint` file schema
non_file_actions_count: i64,
// i64 to match the `_last_checkpoint` file schema
file_actions_count: i64,
actions_count: i64,
// i64 to match the `_last_checkpoint` file schema
add_actions_count: i64,
// i64 for comparison with remove.deletionTimestamp
Expand Down Expand Up @@ -243,8 +241,7 @@ impl CheckpointVisitor<'_> {
Self::REMOVE_DV_START_INDEX,
),
selection_vector,
non_file_actions_count: 0,
file_actions_count: 0,
actions_count: 0,
add_actions_count: 0,
minimum_file_retention_timestamp,
seen_protocol,
Expand Down Expand Up @@ -304,7 +301,6 @@ impl CheckpointVisitor<'_> {
} else if self.is_expired_tombstone(i, getters[Self::REMOVE_DELETION_TIMESTAMP_INDEX])? {
return Ok(false); // Skip expired remove tombstones
}
self.file_actions_count += 1;
Ok(true) // Include this action
}

Expand Down Expand Up @@ -332,7 +328,6 @@ impl CheckpointVisitor<'_> {
}
// Valid, non-duplicate protocol action to be included
self.seen_protocol = true;
self.non_file_actions_count += 1;
Ok(true)
}

Expand All @@ -358,7 +353,6 @@ impl CheckpointVisitor<'_> {

// Valid, non-duplicate metadata action to be included
self.seen_metadata = true;
self.non_file_actions_count += 1;
Ok(true)
}

Expand All @@ -380,7 +374,6 @@ impl CheckpointVisitor<'_> {
}

// Valid, non-duplicate txn action to be included
self.non_file_actions_count += 1;
Ok(true)
}

Expand All @@ -402,10 +395,16 @@ impl CheckpointVisitor<'_> {
) -> DeltaResult<bool> {
// The `||` operator short-circuits the evaluation, so if any of the checks return true,
// the rest will not be evaluated.
Ok(self.check_file_action(i, getters)?
let is_valid = self.check_file_action(i, getters)?
|| self.check_txn_action(i, getters[11])?
|| self.check_protocol_action(i, getters[10])?
|| self.check_metadata_action(i, getters[9])?)
|| self.check_metadata_action(i, getters[9])?;

if is_valid {
self.actions_count += 1;
}

Ok(is_valid)
}
}

Expand Down Expand Up @@ -520,12 +519,11 @@ mod tests {
false, // Row 8 is a checkpointMetadata action (excluded)
];

assert_eq!(visitor.file_actions_count, 2);
assert_eq!(visitor.actions_count, 5);
assert_eq!(visitor.add_actions_count, 1);
assert!(visitor.seen_protocol);
assert!(visitor.seen_metadata);
assert_eq!(visitor.seen_txns.len(), 1);
assert_eq!(visitor.non_file_actions_count, 3);

assert_eq!(visitor.selection_vector, expected);
Ok(())
Expand All @@ -544,7 +542,7 @@ mod tests {
r#"{"remove":{"path":"one_below_threshold","deletionTimestamp":99,"dataChange":true,"partitionValues":{}}}"#,
r#"{"remove":{"path":"one_above_threshold","deletionTimestamp":101,"dataChange":true,"partitionValues":{}}}"#,
// Missing timestamp defaults to 0
r#"{"remove":{"path":"missing_timestamp","dataChange":true,"partitionValues":{}}}"#,
r#"{"remove":{"path":"missing_timestamp","dataChange":true,"partitionValues":{}}}"#,
]
.into();
let batch = parse_json_batch(json_strings);
Expand All @@ -566,9 +564,8 @@ mod tests {
// Only "one_above_threshold" should be kept
let expected = vec![false, false, true, false];
assert_eq!(visitor.selection_vector, expected);
assert_eq!(visitor.file_actions_count, 1);
assert_eq!(visitor.actions_count, 1);
assert_eq!(visitor.add_actions_count, 0);
assert_eq!(visitor.non_file_actions_count, 0);
Ok(())
}

Expand Down Expand Up @@ -596,9 +593,8 @@ mod tests {

let expected = vec![true];
assert_eq!(visitor.selection_vector, expected);
assert_eq!(visitor.file_actions_count, 1);
assert_eq!(visitor.actions_count, 1);
assert_eq!(visitor.add_actions_count, 1);
assert_eq!(visitor.non_file_actions_count, 0);
// The action should NOT be added to the seen_file_keys set as it's a checkpoint batch
// and actions in checkpoint batches do not conflict with each other.
// This is a key difference from log batches, where actions can conflict.
Expand All @@ -610,7 +606,7 @@ mod tests {
fn test_checkpoint_visitor_file_actions_with_deletion_vectors() -> DeltaResult<()> {
let json_strings: StringArray = vec![
// Add action for file1 with deletion vector
r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"ONE","pathOrInlineDv":"dv1","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"ONE","pathOrInlineDv":"dv1","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
// Remove action for file1 with a different deletion vector
r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"TWO","pathOrInlineDv":"dv2","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
// Remove action for file1 with another different deletion vector
Expand All @@ -635,9 +631,8 @@ mod tests {

let expected = vec![true, true, true];
assert_eq!(visitor.selection_vector, expected);
assert_eq!(visitor.file_actions_count, 3);
assert_eq!(visitor.actions_count, 3);
assert_eq!(visitor.add_actions_count, 1);
assert_eq!(visitor.non_file_actions_count, 0);

Ok(())
}
Expand Down Expand Up @@ -671,8 +666,7 @@ mod tests {
// All actions should be skipped as they have already been seen
let expected = vec![false, false, false];
assert_eq!(visitor.selection_vector, expected);
assert_eq!(visitor.non_file_actions_count, 0);
assert_eq!(visitor.file_actions_count, 0);
assert_eq!(visitor.actions_count, 0);

Ok(())
}
Expand All @@ -687,7 +681,7 @@ mod tests {
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7}}"#, // Duplicate protocol
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1677811175819}}"#,
// Duplicate metadata
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1677811175819}}"#,
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1677811175819}}"#,
]
.into();
let batch = parse_json_batch(json_strings);
Expand All @@ -710,8 +704,7 @@ mod tests {
let expected = vec![true, false, true, true, false, true, false];
assert_eq!(visitor.selection_vector, expected);
assert_eq!(visitor.seen_txns.len(), 2); // Two different app IDs
assert_eq!(visitor.non_file_actions_count, 4); // 2 txns + 1 protocol + 1 metadata
assert_eq!(visitor.file_actions_count, 0);
assert_eq!(visitor.actions_count, 4); // 2 txns + 1 protocol + 1 metadata

Ok(())
}
Expand Down
Loading