Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions kernel/src/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,13 +310,18 @@ pub(crate) trait LogReplayProcessor: Sized {
///
/// # Parameters
/// - `batch`: A reference to the batch of actions to be processed.
/// - `is_log_batch`: Whether this batch is from a commit log (`true`) or checkpoint (`false`).
///
/// # Returns
/// A `DeltaResult<Vec<bool>>`, where each boolean indicates if the corresponding row should be included.
/// If no filter is provided, all rows are selected.
fn build_selection_vector(&self, batch: &dyn EngineData) -> DeltaResult<Vec<bool>> {
fn build_selection_vector(
&self,
batch: &dyn EngineData,
is_log_batch: bool,
) -> DeltaResult<Vec<bool>> {
match self.data_skipping_filter() {
Some(filter) => filter.apply(batch),
Some(filter) => filter.apply(batch, is_log_batch),
None => Ok(vec![true; batch.len()]), // If no filter is provided, select all rows
}
}
Expand Down
192 changes: 150 additions & 42 deletions kernel/src/log_segment.rs

Large diffs are not rendered by default.

78 changes: 56 additions & 22 deletions kernel/src/log_segment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1170,8 +1170,13 @@ async fn test_create_checkpoint_stream_returns_checkpoint_batches_as_is_if_schem
None,
None,
)?;
let mut iter =
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?;
let checkpoint_result = log_segment.create_checkpoint_stream(
&engine,
v2_checkpoint_read_schema.clone(),
None,
None,
)?;
let mut iter = checkpoint_result.actions;

// Assert that the first batch returned is from reading checkpoint file 1
let ActionsBatch {
Expand Down Expand Up @@ -1235,8 +1240,13 @@ async fn test_create_checkpoint_stream_returns_checkpoint_batches_if_checkpoint_
None,
None,
)?;
let mut iter =
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?;
let checkpoint_result = log_segment.create_checkpoint_stream(
&engine,
v2_checkpoint_read_schema.clone(),
None,
None,
)?;
let mut iter = checkpoint_result.actions;

// Assert the correctness of batches returned
for expected_sidecar in ["sidecar1.parquet", "sidecar2.parquet"].iter() {
Expand Down Expand Up @@ -1295,8 +1305,13 @@ async fn test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_si
None,
None,
)?;
let mut iter =
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?;
let checkpoint_result = log_segment.create_checkpoint_stream(
&engine,
v2_checkpoint_read_schema.clone(),
None,
None,
)?;
let mut iter = checkpoint_result.actions;

// Assert that the first batch returned is from reading checkpoint file 1
let ActionsBatch {
Expand Down Expand Up @@ -1344,8 +1359,9 @@ async fn test_create_checkpoint_stream_reads_json_checkpoint_batch_without_sidec
None,
None,
)?;
let mut iter =
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema, None)?;
let checkpoint_result =
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema, None, None)?;
let mut iter = checkpoint_result.actions;

// Assert that the first batch returned is from reading checkpoint file 1
let ActionsBatch {
Expand Down Expand Up @@ -1431,8 +1447,13 @@ async fn test_create_checkpoint_stream_reads_checkpoint_file_and_returns_sidecar
None,
None,
)?;
let mut iter =
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?;
let checkpoint_result = log_segment.create_checkpoint_stream(
&engine,
v2_checkpoint_read_schema.clone(),
None,
None,
)?;
let mut iter = checkpoint_result.actions;

// Assert that the first batch returned is from reading checkpoint file 1
let ActionsBatch {
Expand Down Expand Up @@ -2709,6 +2730,18 @@ fn create_checkpoint_schema_with_stats_parsed(min_values_fields: Vec<StructField
StructType::new_unchecked([StructField::nullable("add", add_schema)])
}

// Helper to create a stats_schema with proper structure (numRecords, minValues, maxValues)
fn create_stats_schema(column_fields: Vec<StructField>) -> StructType {
StructType::new_unchecked([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable(
"minValues",
StructType::new_unchecked(column_fields.clone()),
),
StructField::nullable("maxValues", StructType::new_unchecked(column_fields)),
])
}

// Helper to create a checkpoint schema without stats_parsed
fn create_checkpoint_schema_without_stats_parsed() -> StructType {
use crate::schema::StructType;
Expand All @@ -2731,15 +2764,15 @@ fn test_schema_has_compatible_stats_parsed_basic() {
)]);

// Exact type match should work
let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);
assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));

// Type widening (int -> long) should work
let stats_schema_widened =
StructType::new_unchecked([StructField::nullable("id", DataType::LONG)]);
create_stats_schema(vec![StructField::nullable("id", DataType::LONG)]);
assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema_widened
Expand All @@ -2759,17 +2792,18 @@ fn test_schema_has_compatible_stats_parsed_basic() {

#[test]
fn test_schema_has_compatible_stats_parsed_missing_column_ok() {
// Checkpoint has "id" column, stats schema needs "other" column (missing in checkpoint is OK)
// Checkpoint has "id" column, stats schema needs "other" column
// Missing column is acceptable - it will return null when accessed
let checkpoint_schema =
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
"id",
DataType::INTEGER,
)]);

let stats_schema =
StructType::new_unchecked([StructField::nullable("other", DataType::INTEGER)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("other", DataType::INTEGER)]);

// Missing column in checkpoint is OK - it will just return NULL
// Missing column in checkpoint is OK - it will return null when accessed,
// which is acceptable for data skipping (just means we can't skip based on that column)
assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
Expand All @@ -2784,7 +2818,7 @@ fn test_schema_has_compatible_stats_parsed_extra_column_ok() {
StructField::nullable("extra", DataType::STRING),
]);

let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);

assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
Expand All @@ -2797,7 +2831,7 @@ fn test_schema_has_compatible_stats_parsed_no_stats_parsed() {
// Checkpoint schema without stats_parsed field
let checkpoint_schema = create_checkpoint_schema_without_stats_parsed();

let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);

assert!(!LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
Expand All @@ -2814,7 +2848,7 @@ fn test_schema_has_compatible_stats_parsed_empty_stats_schema() {
DataType::INTEGER,
)]);

let stats_schema = StructType::new_unchecked([]);
let stats_schema = create_stats_schema(vec![]);

// If no columns are needed for data skipping, any stats_parsed is compatible
assert!(LogSegment::schema_has_compatible_stats_parsed(
Expand All @@ -2832,7 +2866,7 @@ fn test_schema_has_compatible_stats_parsed_multiple_columns() {
]);

// First column matches, second is incompatible
let stats_schema = StructType::new_unchecked([
let stats_schema = create_stats_schema(vec![
StructField::nullable("good_col", DataType::LONG),
StructField::nullable("bad_col", DataType::INTEGER),
]);
Expand All @@ -2858,7 +2892,7 @@ fn test_schema_has_compatible_stats_parsed_missing_min_max_values() {

let checkpoint_schema = StructType::new_unchecked([StructField::nullable("add", add_schema)]);

let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);

// Should return true - missing minValues/maxValues is handled gracefully with continue
assert!(LogSegment::schema_has_compatible_stats_parsed(
Expand All @@ -2884,7 +2918,7 @@ fn test_schema_has_compatible_stats_parsed_min_values_not_struct() {

let checkpoint_schema = StructType::new_unchecked([StructField::nullable("add", add_schema)]);

let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);

// Should return false - minValues/maxValues must be Struct types
assert!(!LogSegment::schema_has_compatible_stats_parsed(
Expand Down
19 changes: 16 additions & 3 deletions kernel/src/parallel/sequential_phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,11 @@ impl<P: LogReplayProcessor> Iterator for SequentialPhase<P> {
#[cfg(test)]
mod tests {
use super::*;
use crate::actions::get_log_add_schema;
use crate::log_segment::CheckpointReadInfo;
use crate::scan::log_replay::ScanLogReplayProcessor;
use crate::scan::state_info::StateInfo;
use crate::utils::test_utils::{assert_result_error_with_message, load_test_table};
use std::sync::Arc;

/// Core helper function to verify sequential processing with expected adds and sidecars.
fn verify_sequential_processing(
Expand All @@ -222,7 +223,13 @@ mod tests {
(),
)?);

let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?;
// Use base log add schema for tests - no stats_parsed optimization
let checkpoint_info = CheckpointReadInfo {
has_stats_parsed: false,
checkpoint_read_schema: get_log_add_schema().clone(),
};

let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info, checkpoint_info)?;
let mut sequential = SequentialPhase::try_new(processor, log_segment, engine.clone())?;

// Process all batches and collect Add file paths
Expand Down Expand Up @@ -313,7 +320,13 @@ mod tests {
(),
)?);

let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?;
// Use base log add schema for tests - no stats_parsed optimization
let checkpoint_info = CheckpointReadInfo {
has_stats_parsed: false,
checkpoint_read_schema: get_log_add_schema().clone(),
};

let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info, checkpoint_info)?;
let mut sequential = SequentialPhase::try_new(processor, log_segment, engine.clone())?;

// Call next() once but don't exhaust the iterator
Expand Down
Loading
Loading