diff --git a/kernel/src/log_replay.rs b/kernel/src/log_replay.rs index 8881a2c3cf..f334b0e12b 100644 --- a/kernel/src/log_replay.rs +++ b/kernel/src/log_replay.rs @@ -310,13 +310,18 @@ pub(crate) trait LogReplayProcessor: Sized { /// /// # Parameters /// - `batch`: A reference to the batch of actions to be processed. + /// - `is_log_batch`: Whether this batch is from a commit log (`true`) or checkpoint (`false`). /// /// # Returns /// A `DeltaResult>`, where each boolean indicates if the corresponding row should be included. /// If no filter is provided, all rows are selected. - fn build_selection_vector(&self, batch: &dyn EngineData) -> DeltaResult> { + fn build_selection_vector( + &self, + batch: &dyn EngineData, + is_log_batch: bool, + ) -> DeltaResult> { match self.data_skipping_filter() { - Some(filter) => filter.apply(batch), + Some(filter) => filter.apply(batch, is_log_batch), None => Ok(vec![true; batch.len()]), // If no filter is provided, select all rows } } diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 0a4aca7a34..429227915e 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -36,6 +36,31 @@ use url::Url; #[cfg(test)] mod tests; +/// Information about checkpoint reading for data skipping optimization. +/// +/// Returned alongside the actions iterator from checkpoint reading functions. +#[derive(Debug, Clone)] +pub(crate) struct CheckpointReadInfo { + /// Whether the checkpoint has compatible pre-parsed stats for data skipping. + /// When `true`, checkpoint batches can use stats_parsed directly instead of parsing JSON. + #[allow(unused)] + pub has_stats_parsed: bool, + /// The schema used to read checkpoint files, potentially including stats_parsed. + #[allow(unused)] + pub checkpoint_read_schema: SchemaRef, +} + +/// Result of reading actions from a log segment, containing both the actions iterator +/// and checkpoint metadata. +/// +/// This struct provides named access to the return values instead of tuple indexing. +pub(crate) struct ActionsWithCheckpointInfo> + Send> { + /// Iterator over action batches read from the log segment. + pub actions: A, + /// Metadata about checkpoint reading, including the schema used. + pub checkpoint_info: CheckpointReadInfo, +} + /// A [`LogSegment`] represents a contiguous section of the log and is made of checkpoint files /// and commit files and guarantees the following: /// 1. Commit file versions will not have any gaps between them. @@ -384,6 +409,11 @@ impl LogSegment { /// /// `meta_predicate` is an optional expression to filter the log files with. It is _NOT_ the /// query's predicate, but rather a predicate for filtering log files themselves. + /// Read a stream of actions from this log segment. This returns an iterator of + /// [`ActionsBatch`]s which includes EngineData of actions + a boolean flag indicating whether + /// the data was read from a commit file (true) or a checkpoint file (false). + /// + /// Also returns `CheckpointReadInfo` with stats_parsed compatibility and the checkpoint schema. #[internal_api] pub(crate) fn read_actions_with_projected_checkpoint_actions( &self, @@ -391,14 +421,24 @@ impl LogSegment { commit_read_schema: SchemaRef, checkpoint_read_schema: SchemaRef, meta_predicate: Option, - ) -> DeltaResult> + Send> { + stats_schema: Option<&StructType>, + ) -> DeltaResult< + ActionsWithCheckpointInfo> + Send>, + > { // `replay` expects commit files to be sorted in descending order, so the return value here is correct let commit_stream = CommitReader::try_new(engine, self, commit_read_schema)?; - let checkpoint_stream = - self.create_checkpoint_stream(engine, checkpoint_read_schema, meta_predicate)?; + let checkpoint_result = self.create_checkpoint_stream( + engine, + checkpoint_read_schema, + meta_predicate, + stats_schema, + )?; - Ok(commit_stream.chain(checkpoint_stream)) + Ok(ActionsWithCheckpointInfo { + actions: commit_stream.chain(checkpoint_result.actions), + checkpoint_info: checkpoint_result.checkpoint_info, + }) } // Same as above, but uses the same schema for reading checkpoints and commits. @@ -409,12 +449,14 @@ impl LogSegment { action_schema: SchemaRef, meta_predicate: Option, ) -> DeltaResult> + Send> { - self.read_actions_with_projected_checkpoint_actions( + let result = self.read_actions_with_projected_checkpoint_actions( engine, action_schema.clone(), action_schema, meta_predicate, - ) + None, + )?; + Ok(result.actions) } /// find a minimal set to cover the range of commits we want. This is greedy so not always @@ -484,7 +526,8 @@ impl LogSegment { _ => return Ok((None, vec![])), }; - // Cached hint schema for determining V1 vs V2 without footer read + // Cached hint schema for determining V1 vs V2 without footer read. + // hint_schema is Option<&SchemaRef> where SchemaRef = Arc. let hint_schema = self.checkpoint_schema.as_ref(); match checkpoint.extension.as_str() { @@ -514,12 +557,14 @@ impl LogSegment { Some(true) => { // Hint says V2 checkpoint, extract sidecars let sidecar_files = self.extract_sidecar_refs(engine, checkpoint)?; - // For V2, read first sidecar's schema + // For V2, read first sidecar's schema if sidecars exist. + // If no sidecars, V2 checkpoint may still have add actions in main file + // (like V1), so fall back to hint schema for stats_parsed check. let file_actions_schema = match sidecar_files.first() { Some(first) => { Some(engine.parquet_handler().read_parquet_footer(first)?.schema) } - None => None, + None => hint_schema.cloned(), }; Ok((file_actions_schema, sidecar_files)) } @@ -532,11 +577,14 @@ impl LogSegment { if footer.schema.field(SIDECAR_NAME).is_some() { // V2 parquet checkpoint let sidecar_files = self.extract_sidecar_refs(engine, checkpoint)?; + // For V2, read first sidecar's schema if sidecars exist. + // If no sidecars, V2 checkpoint may still have add actions in main file + // (like V1), so fall back to footer schema for stats_parsed check. let file_actions_schema = match sidecar_files.first() { Some(first) => Some( engine.parquet_handler().read_parquet_footer(first)?.schema, ), - None => None, + None => Some(footer.schema), }; Ok((file_actions_schema, sidecar_files)) } else { @@ -556,12 +604,17 @@ impl LogSegment { /// 1. Determines the files actions schema (for future stats_parsed detection) /// 2. Extracts sidecar file references if present (V2 checkpoints) /// 3. Reads checkpoint and sidecar data using cached sidecar refs + /// + /// Returns a tuple of the actions iterator and [`CheckpointReadInfo`]. fn create_checkpoint_stream( &self, engine: &dyn Engine, action_schema: SchemaRef, meta_predicate: Option, - ) -> DeltaResult> + Send> { + stats_schema: Option<&StructType>, + ) -> DeltaResult< + ActionsWithCheckpointInfo> + Send>, + > { let need_file_actions = schema_contains_file_actions(&action_schema); // Extract file actions schema and sidecar files @@ -574,22 +627,61 @@ impl LogSegment { (None, vec![]) }; - // (Future) Determine if there are usable parsed stats - // let _has_stats_parsed = file_actions_schema.as_ref() - // .map(|s| Self::schema_has_compatible_stats_parsed(s, stats_schema)) - // .unwrap_or(false); - let _ = file_actions_schema; // Suppress unused warning for now - - // Read the actual checkpoint files, using cached sidecar files - // We expand sidecars if we have them and need file actions - let checkpoint_read_schema = if need_file_actions - && !sidecar_files.is_empty() - && !action_schema.contains(SIDECAR_NAME) - { - Arc::new( - action_schema.add([StructField::nullable(SIDECAR_NAME, Sidecar::to_schema())])?, + // Check if checkpoint has compatible stats_parsed and add it to the schema if so + let has_stats_parsed = + stats_schema + .zip(file_actions_schema.as_ref()) + .is_some_and(|(stats, file_schema)| { + Self::schema_has_compatible_stats_parsed(file_schema, stats) + }); + + // Build final schema with any additional fields needed (stats_parsed, sidecar) + // Only modify the schema if it has an "add" field (i.e., we need file actions) + let augmented_checkpoint_read_schema = if let Some(add_field) = action_schema.field("add") { + let DataType::Struct(add_struct) = add_field.data_type() else { + return Err(Error::internal_error( + "add field in action schema must be a struct", + )); + }; + let mut add_fields: Vec = add_struct.fields().cloned().collect(); + + // Add stats_parsed if checkpoint has compatible parsed stats + if let (true, Some(stats_schema)) = (has_stats_parsed, stats_schema) { + add_fields.push(StructField::nullable( + "stats_parsed", + DataType::Struct(Box::new(stats_schema.clone())), + )); + } + + // Rebuild the add field with any new fields (stats_parsed) + let new_add_field = StructField::new( + add_field.name(), + StructType::new_unchecked(add_fields), + add_field.is_nullable(), ) + .with_metadata(add_field.metadata.clone()); + + // Rebuild schema with modified add field + let mut new_fields: Vec = action_schema + .fields() + .map(|f| { + if f.name() == "add" { + new_add_field.clone() + } else { + f.clone() + } + }) + .collect(); + + // Add sidecar column at top-level for V2 checkpoints + if need_file_actions && !sidecar_files.is_empty() { + new_fields.push(StructField::nullable(SIDECAR_NAME, Sidecar::to_schema())); + } + + Arc::new(StructType::new_unchecked(new_fields)) } else { + // Schema doesn't have "add" field (e.g., for metadata/protocol only reads) + // Use the action_schema as-is action_schema.clone() }; @@ -609,14 +701,14 @@ impl LogSegment { Some(parsed_log_path) if parsed_log_path.extension == "json" => { engine.json_handler().read_json_files( &checkpoint_file_meta, - checkpoint_read_schema.clone(), + augmented_checkpoint_read_schema.clone(), meta_predicate.clone(), )? } Some(parsed_log_path) if parsed_log_path.extension == "parquet" => parquet_handler .read_parquet_files( &checkpoint_file_meta, - checkpoint_read_schema.clone(), + augmented_checkpoint_read_schema.clone(), meta_predicate.clone(), )?, Some(parsed_log_path) => { @@ -644,7 +736,14 @@ impl LogSegment { .map_ok(|batch| ActionsBatch::new(batch, false)) .chain(sidecar_batches.map_ok(|batch| ActionsBatch::new(batch, false))); - Ok(actions_iter) + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed, + checkpoint_read_schema: augmented_checkpoint_read_schema, + }; + Ok(ActionsWithCheckpointInfo { + actions: actions_iter, + checkpoint_info, + }) } /// Extracts sidecar file references from a checkpoint file. @@ -804,8 +903,7 @@ impl LogSegment { /// use physical column names (not logical names), so direct name comparison is correct. /// /// Returns `false` if stats_parsed doesn't exist or has incompatible types. - #[allow(dead_code)] - fn schema_has_compatible_stats_parsed( + pub(crate) fn schema_has_compatible_stats_parsed( checkpoint_schema: &StructType, stats_schema: &StructType, ) -> bool { @@ -833,42 +931,52 @@ impl LogSegment { // While these typically have the same schema, the protocol doesn't guarantee it, // so we check both to be safe. for field_name in ["minValues", "maxValues"] { - let Some(values_field) = stats_struct.field(field_name) else { + let Some(checkpoint_values_field) = stats_struct.field(field_name) else { // stats_parsed exists but no minValues/maxValues - unusual but valid continue; }; // minValues/maxValues must be a Struct containing per-column statistics. // If it exists but isn't a Struct, the schema is malformed and unusable. - let DataType::Struct(values_struct) = values_field.data_type() else { + let DataType::Struct(checkpoint_values) = checkpoint_values_field.data_type() else { debug!( - "stats_parsed not compatible: stats_parsed.{} is not a Struct, got {:?}", + "stats_parsed not compatible: stats_parsed. {} is not a Struct, got {:?}", field_name, - values_field.data_type() + checkpoint_values_field.data_type() ); return false; }; - // Check type compatibility for each column in the checkpoint's stats_parsed - // that also exists in the stats schema (columns needed for data skipping) - for checkpoint_field in values_struct.fields() { - if let Some(stats_field) = stats_schema.field(&checkpoint_field.name) { + // Get the corresponding field from stats_schema (e.g., stats_schema.minValues) + let Some(stats_values_field) = stats_schema.field(field_name) else { + // stats_schema doesn't have minValues/maxValues, skip this check + continue; + }; + let DataType::Struct(stats_values) = stats_values_field.data_type() else { + // stats_schema.minValues/maxValues isn't a struct - shouldn't happen but skip + continue; + }; + + // Check type compatibility for each column needed for data skipping + // If it exists in checkpoint, verify types are compatible + for stats_field in stats_values.fields() { + if let Some(checkpoint_field) = checkpoint_values.field(&stats_field.name) { if checkpoint_field .data_type() .can_read_as(stats_field.data_type()) .is_err() { debug!( - "stats_parsed not compatible: incompatible type for column '{}' in {}: checkpoint has {:?}, stats schema has {:?}", - checkpoint_field.name, - field_name, + "stats_parsed not compatible: incompatible type for column '{}' in {field_name}: checkpoint has {:?}, stats schema needs {:?}", + stats_field.name, checkpoint_field.data_type(), stats_field.data_type() ); return false; } } - // If column doesn't exist in stats schema, it's fine (not needed for data skipping) + // If the column is missing from checkpoint's stats_parsed, it will return + // null when accessed, which is acceptable for data skipping. } } diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index 3d39933cea..6fd58f7de2 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -1170,8 +1170,13 @@ async fn test_create_checkpoint_stream_returns_checkpoint_batches_as_is_if_schem None, None, )?; - let mut iter = - log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?; + let checkpoint_result = log_segment.create_checkpoint_stream( + &engine, + v2_checkpoint_read_schema.clone(), + None, + None, + )?; + let mut iter = checkpoint_result.actions; // Assert that the first batch returned is from reading checkpoint file 1 let ActionsBatch { @@ -1235,8 +1240,13 @@ async fn test_create_checkpoint_stream_returns_checkpoint_batches_if_checkpoint_ None, None, )?; - let mut iter = - log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?; + let checkpoint_result = log_segment.create_checkpoint_stream( + &engine, + v2_checkpoint_read_schema.clone(), + None, + None, + )?; + let mut iter = checkpoint_result.actions; // Assert the correctness of batches returned for expected_sidecar in ["sidecar1.parquet", "sidecar2.parquet"].iter() { @@ -1295,8 +1305,13 @@ async fn test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_si None, None, )?; - let mut iter = - log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?; + let checkpoint_result = log_segment.create_checkpoint_stream( + &engine, + v2_checkpoint_read_schema.clone(), + None, + None, + )?; + let mut iter = checkpoint_result.actions; // Assert that the first batch returned is from reading checkpoint file 1 let ActionsBatch { @@ -1344,8 +1359,9 @@ async fn test_create_checkpoint_stream_reads_json_checkpoint_batch_without_sidec None, None, )?; - let mut iter = - log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema, None)?; + let checkpoint_result = + log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema, None, None)?; + let mut iter = checkpoint_result.actions; // Assert that the first batch returned is from reading checkpoint file 1 let ActionsBatch { @@ -1431,8 +1447,13 @@ async fn test_create_checkpoint_stream_reads_checkpoint_file_and_returns_sidecar None, None, )?; - let mut iter = - log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?; + let checkpoint_result = log_segment.create_checkpoint_stream( + &engine, + v2_checkpoint_read_schema.clone(), + None, + None, + )?; + let mut iter = checkpoint_result.actions; // Assert that the first batch returned is from reading checkpoint file 1 let ActionsBatch { @@ -2709,6 +2730,18 @@ fn create_checkpoint_schema_with_stats_parsed(min_values_fields: Vec) -> StructType { + StructType::new_unchecked([ + StructField::nullable("numRecords", DataType::LONG), + StructField::nullable( + "minValues", + StructType::new_unchecked(column_fields.clone()), + ), + StructField::nullable("maxValues", StructType::new_unchecked(column_fields)), + ]) +} + // Helper to create a checkpoint schema without stats_parsed fn create_checkpoint_schema_without_stats_parsed() -> StructType { use crate::schema::StructType; @@ -2731,7 +2764,7 @@ fn test_schema_has_compatible_stats_parsed_basic() { )]); // Exact type match should work - let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]); + let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]); assert!(LogSegment::schema_has_compatible_stats_parsed( &checkpoint_schema, &stats_schema @@ -2739,7 +2772,7 @@ fn test_schema_has_compatible_stats_parsed_basic() { // Type widening (int -> long) should work let stats_schema_widened = - StructType::new_unchecked([StructField::nullable("id", DataType::LONG)]); + create_stats_schema(vec![StructField::nullable("id", DataType::LONG)]); assert!(LogSegment::schema_has_compatible_stats_parsed( &checkpoint_schema, &stats_schema_widened @@ -2759,17 +2792,18 @@ fn test_schema_has_compatible_stats_parsed_basic() { #[test] fn test_schema_has_compatible_stats_parsed_missing_column_ok() { - // Checkpoint has "id" column, stats schema needs "other" column (missing in checkpoint is OK) + // Checkpoint has "id" column, stats schema needs "other" column + // Missing column is acceptable - it will return null when accessed let checkpoint_schema = create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable( "id", DataType::INTEGER, )]); - let stats_schema = - StructType::new_unchecked([StructField::nullable("other", DataType::INTEGER)]); + let stats_schema = create_stats_schema(vec![StructField::nullable("other", DataType::INTEGER)]); - // Missing column in checkpoint is OK - it will just return NULL + // Missing column in checkpoint is OK - it will return null when accessed, + // which is acceptable for data skipping (just means we can't skip based on that column) assert!(LogSegment::schema_has_compatible_stats_parsed( &checkpoint_schema, &stats_schema @@ -2784,7 +2818,7 @@ fn test_schema_has_compatible_stats_parsed_extra_column_ok() { StructField::nullable("extra", DataType::STRING), ]); - let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]); + let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]); assert!(LogSegment::schema_has_compatible_stats_parsed( &checkpoint_schema, @@ -2797,7 +2831,7 @@ fn test_schema_has_compatible_stats_parsed_no_stats_parsed() { // Checkpoint schema without stats_parsed field let checkpoint_schema = create_checkpoint_schema_without_stats_parsed(); - let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]); + let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]); assert!(!LogSegment::schema_has_compatible_stats_parsed( &checkpoint_schema, @@ -2814,7 +2848,7 @@ fn test_schema_has_compatible_stats_parsed_empty_stats_schema() { DataType::INTEGER, )]); - let stats_schema = StructType::new_unchecked([]); + let stats_schema = create_stats_schema(vec![]); // If no columns are needed for data skipping, any stats_parsed is compatible assert!(LogSegment::schema_has_compatible_stats_parsed( @@ -2832,7 +2866,7 @@ fn test_schema_has_compatible_stats_parsed_multiple_columns() { ]); // First column matches, second is incompatible - let stats_schema = StructType::new_unchecked([ + let stats_schema = create_stats_schema(vec![ StructField::nullable("good_col", DataType::LONG), StructField::nullable("bad_col", DataType::INTEGER), ]); @@ -2858,7 +2892,7 @@ fn test_schema_has_compatible_stats_parsed_missing_min_max_values() { let checkpoint_schema = StructType::new_unchecked([StructField::nullable("add", add_schema)]); - let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]); + let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]); // Should return true - missing minValues/maxValues is handled gracefully with continue assert!(LogSegment::schema_has_compatible_stats_parsed( @@ -2884,7 +2918,7 @@ fn test_schema_has_compatible_stats_parsed_min_values_not_struct() { let checkpoint_schema = StructType::new_unchecked([StructField::nullable("add", add_schema)]); - let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]); + let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]); // Should return false - minValues/maxValues must be Struct types assert!(!LogSegment::schema_has_compatible_stats_parsed( diff --git a/kernel/src/parallel/sequential_phase.rs b/kernel/src/parallel/sequential_phase.rs index 7150e1503c..21b16ee18d 100644 --- a/kernel/src/parallel/sequential_phase.rs +++ b/kernel/src/parallel/sequential_phase.rs @@ -201,10 +201,11 @@ impl Iterator for SequentialPhase

{ #[cfg(test)] mod tests { use super::*; + use crate::actions::get_log_add_schema; + use crate::log_segment::CheckpointReadInfo; use crate::scan::log_replay::ScanLogReplayProcessor; use crate::scan::state_info::StateInfo; use crate::utils::test_utils::{assert_result_error_with_message, load_test_table}; - use std::sync::Arc; /// Core helper function to verify sequential processing with expected adds and sidecars. fn verify_sequential_processing( @@ -222,7 +223,13 @@ mod tests { (), )?); - let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?; + // Use base log add schema for tests - no stats_parsed optimization + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; + + let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info, checkpoint_info)?; let mut sequential = SequentialPhase::try_new(processor, log_segment, engine.clone())?; // Process all batches and collect Add file paths @@ -313,7 +320,13 @@ mod tests { (), )?); - let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?; + // Use base log add schema for tests - no stats_parsed optimization + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; + + let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info, checkpoint_info)?; let mut sequential = SequentialPhase::try_new(processor, log_segment, engine.clone())?; // Call next() once but don't exhaust the iterator diff --git a/kernel/src/scan/data_skipping.rs b/kernel/src/scan/data_skipping.rs index 3d69898761..1fc6ffcf5b 100644 --- a/kernel/src/scan/data_skipping.rs +++ b/kernel/src/scan/data_skipping.rs @@ -3,7 +3,6 @@ use std::sync::{Arc, LazyLock}; use tracing::{debug, error}; -use crate::actions::get_log_add_schema; use crate::actions::visitors::SelectionVectorVisitor; use crate::error::DeltaResult; use crate::expressions::{ @@ -14,14 +13,13 @@ use crate::expressions::{ use crate::kernel_predicates::{ DataSkippingPredicateEvaluator, KernelPredicateEvaluator, KernelPredicateEvaluatorDefaults, }; -use crate::schema::{DataType, SchemaRef, SchemaTransform, StructField, StructType}; +use crate::schema::{DataType, SchemaRef}; use crate::{ Engine, EngineData, ExpressionEvaluator, JsonHandler, PredicateEvaluator, RowVisitor as _, }; pub(crate) mod stats_schema; -use stats_schema::{NullCountStatsTransform, NullableStatsTransform}; #[cfg(test)] mod tests; @@ -55,46 +53,50 @@ fn as_sql_data_skipping_predicate(pred: &Pred) -> Option { pub(crate) struct DataSkippingFilter { stats_schema: SchemaRef, select_stats_evaluator: Arc, + /// Evaluator for extracting stats_parsed from checkpoints. + /// Only present when the checkpoint has compatible pre-parsed stats. + select_stats_parsed_evaluator: Option>, skipping_evaluator: Arc, filter_evaluator: Arc, json_handler: Arc, } impl DataSkippingFilter { - /// Creates a new data skipping filter. Returns None if there is no predicate, or the predicate - /// is ineligible for data skipping. + /// Creates a new data skipping filter. Returns None if there is no predicate/stats_schema, + /// or the predicate is ineligible for data skipping. /// /// NOTE: None is equivalent to a trivial filter that always returns TRUE (= keeps all files), /// but using an Option lets the engine easily avoid the overhead of applying trivial filters. + /// + /// `checkpoint_read_schema` is the schema used to read checkpoint files, which includes + /// `stats_parsed` for data skipping optimization. This schema is needed to create the + /// evaluator that extracts stats_parsed from the actions. + /// + /// `has_compatible_stats_parsed` indicates whether the checkpoint has compatible pre-parsed + /// stats. When true, checkpoint batches use stats_parsed directly instead of parsing JSON. pub(crate) fn new( engine: &dyn Engine, - physical_predicate: Option<(PredicateRef, SchemaRef)>, + predicate: Option, + stats_schema: Option, + checkpoint_read_schema: SchemaRef, + has_compatible_stats_parsed: bool, ) -> Option { static STATS_EXPR: LazyLock = LazyLock::new(|| Arc::new(column_expr!("add.stats"))); + static STATS_PARSED_EXPR: LazyLock = + LazyLock::new(|| Arc::new(column_expr!("add.stats_parsed"))); static FILTER_PRED: LazyLock = LazyLock::new(|| Arc::new(column_expr!("output").distinct(Expr::literal(false)))); - let (predicate, referenced_schema) = physical_predicate?; + let predicate = predicate?; + let stats_schema = stats_schema?; debug!("Creating a data skipping filter for {:#?}", predicate); - let stats_schema = NullableStatsTransform - .transform_struct(&referenced_schema)? - .into_owned(); - - let nullcount_schema = NullCountStatsTransform - .transform_struct(&stats_schema)? - .into_owned(); - let stats_schema = Arc::new(StructType::new_unchecked([ - StructField::nullable("numRecords", DataType::LONG), - StructField::nullable("nullCount", nullcount_schema), - StructField::nullable("minValues", stats_schema.clone()), - StructField::nullable("maxValues", stats_schema), - ])); - // Skipping happens in several steps: // - // 1. The stats selector fetches add.stats from the metadata + // 1. The stats selector fetches add.stats or add.stats_parsed from the metadata. + // For checkpoint batches with compatible stats_parsed, we use stats_parsed directly. + // Otherwise, we parse add.stats (JSON string) to a stats struct. // // 2. The predicate (skipping evaluator) produces false for any file whose stats prove we // can safely skip it. A value of true means the stats say we must keep the file, and @@ -106,7 +108,7 @@ impl DataSkippingFilter { let select_stats_evaluator = engine .evaluation_handler() .new_expression_evaluator( - get_log_add_schema().clone(), + checkpoint_read_schema.clone(), STATS_EXPR.clone(), DataType::STRING, ) @@ -115,6 +117,23 @@ impl DataSkippingFilter { .inspect_err(|e| error!("Failed to create select stats evaluator: {e}")) .ok()?; + // Only create stats_parsed evaluator when checkpoint has compatible pre-parsed stats + let select_stats_parsed_evaluator = if has_compatible_stats_parsed { + engine + .evaluation_handler() + .new_expression_evaluator( + checkpoint_read_schema.clone(), + STATS_PARSED_EXPR.clone(), + DataType::Struct(Box::new(stats_schema.as_ref().clone())), + ) + .inspect_err(|e| { + debug!("stats_parsed evaluator not available (falling back to JSON): {e}") + }) + .ok() + } else { + None + }; + let skipping_evaluator = engine .evaluation_handler() .new_predicate_evaluator( @@ -137,6 +156,7 @@ impl DataSkippingFilter { Some(Self { stats_schema, select_stats_evaluator, + select_stats_parsed_evaluator, skipping_evaluator, filter_evaluator, json_handler: engine.json_handler(), @@ -145,24 +165,48 @@ impl DataSkippingFilter { /// Apply the DataSkippingFilter to an EngineData batch of actions. Returns a selection vector /// which can be applied to the actions to find those that passed data skipping. - pub(crate) fn apply(&self, actions: &dyn EngineData) -> DeltaResult> { - // retrieve and parse stats from actions data - let stats = self.select_stats_evaluator.evaluate(actions)?; - assert_eq!(stats.len(), actions.len()); - let parsed_stats = self - .json_handler - .parse_json(stats, self.stats_schema.clone())?; - assert_eq!(parsed_stats.len(), actions.len()); - - // evaluate the predicate on the parsed stats, then convert to selection vector - let skipping_predicate = self.skipping_evaluator.evaluate(&*parsed_stats)?; + /// + /// `is_log_batch` indicates whether this batch is from a commit log (`true`) or checkpoint (`false`). + /// Checkpoint batches may have pre-parsed stats (`stats_parsed`) that can be used directly + /// instead of parsing JSON. Commit batches only have JSON stats. + pub(crate) fn apply( + &self, + actions: &dyn EngineData, + is_log_batch: bool, + ) -> DeltaResult> { + // Get the final stats by either: + // 1. Using stats_parsed directly (for checkpoint batches with compatible stats) + // 2. Parsing JSON (for commit batches or when stats_parsed unavailable) + let final_stats = if let Some(stats_parsed_evaluator) = (!is_log_batch) + .then_some(()) + .and(self.select_stats_parsed_evaluator.as_ref()) + { + // Checkpoint batch with compatible stats_parsed - use it directly + let stats_parsed = stats_parsed_evaluator.evaluate(actions)?; + debug!( + "Using stats_parsed from checkpoint ({} rows)", + stats_parsed.len() + ); + stats_parsed + } else { + // Commit batch or no stats_parsed evaluator - parse JSON + let stats_json = self.select_stats_evaluator.evaluate(actions)?; + assert_eq!(stats_json.len(), actions.len()); + self.json_handler + .parse_json(stats_json, self.stats_schema.clone())? + }; + + assert_eq!(final_stats.len(), actions.len()); + + // Evaluate predicate on the stats + let skipping_predicate = self.skipping_evaluator.evaluate(&*final_stats)?; assert_eq!(skipping_predicate.len(), actions.len()); let selection_vector = self .filter_evaluator .evaluate(skipping_predicate.as_ref())?; assert_eq!(selection_vector.len(), actions.len()); - // visit the engine's selection vector to produce a Vec + // Visit the engine's selection vector to produce a Vec let mut visitor = SelectionVectorVisitor::default(); visitor.visit_rows_of(selection_vector.as_ref())?; Ok(visitor.selection_vector) diff --git a/kernel/src/scan/data_skipping/stats_schema.rs b/kernel/src/scan/data_skipping/stats_schema.rs index e18423b467..df0f1dfb5e 100644 --- a/kernel/src/scan/data_skipping/stats_schema.rs +++ b/kernel/src/scan/data_skipping/stats_schema.rs @@ -1,11 +1,12 @@ //! This module contains logic to compute the expected schema for file statistics use std::borrow::Cow; +use std::sync::Arc; use crate::{ schema::{ - ArrayType, ColumnName, DataType, MapType, PrimitiveType, Schema, SchemaTransform, - StructField, StructType, + ArrayType, ColumnName, DataType, MapType, PrimitiveType, Schema, SchemaRef, + SchemaTransform, StructField, StructType, }, table_properties::{DataSkippingNumIndexedCols, TableProperties}, DeltaResult, @@ -133,6 +134,28 @@ pub(crate) fn expected_stats_schema( StructType::try_new(fields) } +/// Creates a stats schema from a referenced schema (columns from predicate). +/// Returns schema: `{ numRecords, nullCount, minValues, maxValues }` +/// +/// This is used to build the schema for parsing JSON stats and for reading stats_parsed +/// from checkpoints. +pub(crate) fn build_stats_schema(referenced_schema: &StructType) -> Option { + let stats_schema = NullableStatsTransform + .transform_struct(referenced_schema)? + .into_owned(); + + let nullcount_schema = NullCountStatsTransform + .transform_struct(&stats_schema)? + .into_owned(); + + Some(Arc::new(StructType::new_unchecked([ + StructField::nullable("numRecords", DataType::LONG), + StructField::nullable("nullCount", nullcount_schema), + StructField::nullable("minValues", stats_schema.clone()), + StructField::nullable("maxValues", stats_schema), + ]))) +} + /// Transforms a schema to make all fields nullable. /// Used for stats schemas where stats may not be available for all columns. pub(crate) struct NullableStatsTransform; @@ -158,7 +181,6 @@ impl<'a> SchemaTransform<'a> for NullableStatsTransform { /// All leaf fields (primitives, arrays, maps, variants) are converted to LONG type /// since null counts are always integers, while struct fields are recursed into /// to preserve the nested structure. -#[allow(unused)] pub(crate) struct NullCountStatsTransform; impl<'a> SchemaTransform<'a> for NullCountStatsTransform { fn transform_struct_field(&mut self, field: &'a StructField) -> Option> { diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 23ffb1d7f0..a830babcab 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -8,15 +8,15 @@ use super::data_skipping::DataSkippingFilter; use super::state_info::StateInfo; use super::{PhysicalPredicate, ScanMetadata}; use crate::actions::deletion_vector::DeletionVectorDescriptor; -use crate::actions::get_log_add_schema; use crate::engine_data::{GetData, RowVisitor, TypedGetData as _}; use crate::expressions::{column_name, ColumnName, Expression, ExpressionRef, PredicateRef}; use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, KernelPredicateEvaluator as _}; use crate::log_replay::deduplicator::Deduplicator; use crate::log_replay::{ActionsBatch, FileActionDeduplicator, FileActionKey, LogReplayProcessor}; +use crate::log_segment::CheckpointReadInfo; use crate::scan::Scalar; use crate::schema::ToSchema as _; -use crate::schema::{ColumnNamesAndTypes, DataType, MapType, StructField, StructType}; +use crate::schema::{ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType}; use crate::table_features::ColumnMappingMode; use crate::transforms::{get_transform_expr, parse_partition_values, TransformSpec}; use crate::utils::require; @@ -32,6 +32,7 @@ struct InternalScanState { predicate_schema: Option>, transform_spec: Option>, column_mapping_mode: ColumnMappingMode, + stats_schema: Option, } /// Public-facing serialized processor state for distributed processing. @@ -45,6 +46,8 @@ pub struct SerializableScanState { pub internal_state_blob: Vec, /// Set of file action keys that have already been processed. pub seen_file_keys: HashSet, + /// Information about checkpoint reading for data skipping optimization + pub(crate) checkpoint_info: CheckpointReadInfo, } /// [`ScanLogReplayProcessor`] performs log replay (processes actions) specifically for doing a table scan. @@ -91,8 +94,12 @@ impl ScanLogReplayProcessor { const REMOVE_DV_START_INDEX: usize = 7; // Start position of remove deletion vector columns /// Create a new [`ScanLogReplayProcessor`] instance - pub(crate) fn new(engine: &dyn Engine, state_info: Arc) -> DeltaResult { - Self::new_with_seen_files(engine, state_info, Default::default()) + pub(crate) fn new( + engine: &dyn Engine, + state_info: Arc, + checkpoint_info: CheckpointReadInfo, + ) -> DeltaResult { + Self::new_with_seen_files(engine, state_info, checkpoint_info, Default::default()) } /// Create new [`ScanLogReplayProcessor`] with pre-populated seen_file_keys. @@ -103,35 +110,38 @@ impl ScanLogReplayProcessor { /// # Parameters /// - `engine`: Engine for creating evaluators and filters /// - `state_info`: StateInfo containing schemas, transforms, and predicates + /// - `checkpoint_info`: Information about checkpoint reading for data skipping /// - `seen_file_keys`: Pre-computed set of file action keys that have been seen pub(crate) fn new_with_seen_files( engine: &dyn Engine, state_info: Arc, + checkpoint_info: CheckpointReadInfo, seen_file_keys: HashSet, ) -> DeltaResult { - // Extract the physical predicate from StateInfo's PhysicalPredicate enum. - // The DataSkippingFilter and partition_filter components expect the predicate - // in the format Option<(PredicateRef, SchemaRef)>, so we need to convert from - // the enum representation to the tuple format. - let physical_predicate = match &state_info.physical_predicate { - PhysicalPredicate::Some(predicate, schema) => { - // Valid predicate that can be used for data skipping and partition filtering - Some((predicate.clone(), schema.clone())) - } + // Extract the predicate from StateInfo's PhysicalPredicate enum. + let predicate = match &state_info.physical_predicate { + PhysicalPredicate::Some(predicate, _) => Some(predicate.clone()), PhysicalPredicate::StaticSkipAll => { debug_assert!(false, "StaticSkipAll case should be handled at a higher level and not reach this code"); None } - PhysicalPredicate::None => { - // No predicate provided - None - } + PhysicalPredicate::None => None, }; + let CheckpointReadInfo { + has_stats_parsed, + checkpoint_read_schema, + } = checkpoint_info; Ok(Self { - partition_filter: physical_predicate.as_ref().map(|(e, _)| e.clone()), - data_skipping_filter: DataSkippingFilter::new(engine, physical_predicate), + partition_filter: predicate.clone(), + data_skipping_filter: DataSkippingFilter::new( + engine, + predicate, + state_info.stats_schema.clone(), + checkpoint_read_schema.clone(), + has_stats_parsed, + ), add_transform: engine.evaluation_handler().new_expression_evaluator( - get_log_add_schema().clone(), + checkpoint_read_schema, get_add_transform_expr(), SCAN_ROW_DATATYPE.clone(), )?, @@ -155,13 +165,17 @@ impl ScanLogReplayProcessor { /// undefined behaviour! #[internal_api] #[allow(unused)] - pub(crate) fn into_serializable_state(self) -> DeltaResult { + pub(crate) fn into_serializable_state( + self, + checkpoint_info: CheckpointReadInfo, + ) -> DeltaResult { let StateInfo { logical_schema, physical_schema, physical_predicate, transform_spec, column_mapping_mode, + stats_schema, } = self.state_info.as_ref().clone(); // Extract predicate from PhysicalPredicate @@ -177,17 +191,17 @@ impl ScanLogReplayProcessor { transform_spec, predicate_schema, column_mapping_mode, + stats_schema, }; let internal_state_blob = serde_json::to_vec(&internal_state) .map_err(|e| Error::generic(format!("Failed to serialize internal state: {}", e)))?; - let state = SerializableScanState { + Ok(SerializableScanState { predicate, internal_state_blob, seen_file_keys: self.seen_file_keys, - }; - - Ok(state) + checkpoint_info, + }) } /// Reconstruct a processor from serialized state. @@ -232,9 +246,15 @@ impl ScanLogReplayProcessor { physical_predicate, transform_spec: internal_state.transform_spec, column_mapping_mode: internal_state.column_mapping_mode, + stats_schema: internal_state.stats_schema, }); - let processor = Self::new_with_seen_files(engine, state_info, state.seen_file_keys)?; + let processor = Self::new_with_seen_files( + engine, + state_info, + state.checkpoint_info, + state.seen_file_keys, + )?; Ok(Arc::new(processor)) } @@ -504,7 +524,7 @@ impl LogReplayProcessor for ScanLogReplayProcessor { // Build an initial selection vector for the batch which has had the data skipping filter // applied. The selection vector is further updated by the deduplication visitor to remove // rows that are not valid adds. - let selection_vector = self.build_selection_vector(actions.as_ref())?; + let selection_vector = self.build_selection_vector(actions.as_ref(), is_log_batch)?; assert_eq!(selection_vector.len(), actions.len()); let deduplicator = FileActionDeduplicator::new( @@ -549,8 +569,12 @@ pub(crate) fn scan_action_iter( engine: &dyn Engine, action_iter: impl Iterator>, state_info: Arc, + checkpoint_info: CheckpointReadInfo, ) -> DeltaResult>> { - Ok(ScanLogReplayProcessor::new(engine, state_info)?.process_actions_iter(action_iter)) + Ok( + ScanLogReplayProcessor::new(engine, state_info, checkpoint_info)? + .process_actions_iter(action_iter), + ) } #[cfg(test)] @@ -558,10 +582,11 @@ mod tests { use std::collections::{HashMap, HashSet}; use std::sync::Arc; - use crate::actions::get_commit_schema; + use crate::actions::{get_commit_schema, get_log_add_schema}; use crate::engine::sync::SyncEngine; use crate::expressions::{BinaryExpressionOp, Scalar}; use crate::log_replay::ActionsBatch; + use crate::log_segment::CheckpointReadInfo; use crate::scan::state::ScanFile; use crate::scan::state_info::tests::{ assert_transform_spec, get_simple_state_info, get_state_info, @@ -634,13 +659,19 @@ mod tests { physical_predicate: PhysicalPredicate::None, transform_spec: None, column_mapping_mode: ColumnMappingMode::None, + stats_schema: None, }); + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let iter = scan_action_iter( &SyncEngine::new(), batch .into_iter() .map(|batch| Ok(ActionsBatch::new(batch as _, true))), state_info, + checkpoint_info, ) .unwrap(); for res in iter { @@ -661,12 +692,17 @@ mod tests { let partition_cols = vec!["date".to_string()]; let state_info = get_simple_state_info(schema, partition_cols).unwrap(); let batch = vec![add_batch_with_partition_col()]; + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let iter = scan_action_iter( &SyncEngine::new(), batch .into_iter() .map(|batch| Ok(ActionsBatch::new(batch as _, true))), Arc::new(state_info), + checkpoint_info, ) .unwrap(); @@ -739,12 +775,17 @@ mod tests { ); let batch = vec![add_batch_for_row_id(get_commit_schema().clone())]; + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let iter = scan_action_iter( &SyncEngine::new(), batch .into_iter() .map(|batch| Ok(ActionsBatch::new(batch as _, true))), Arc::new(state_info), + checkpoint_info, ) .unwrap(); @@ -784,9 +825,14 @@ mod tests { StructField::new("id", DataType::INTEGER, true), StructField::new("value", DataType::STRING, true), ])); + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let mut processor = ScanLogReplayProcessor::new( &engine, Arc::new(get_simple_state_info(schema.clone(), vec![]).unwrap()), + checkpoint_info.clone(), ) .unwrap(); @@ -801,7 +847,7 @@ mod tests { let state_info = processor.state_info.clone(); let deserialized = ScanLogReplayProcessor::from_serializable_state( &engine, - processor.into_serializable_state().unwrap(), + processor.into_serializable_state(checkpoint_info).unwrap(), ) .unwrap(); @@ -834,6 +880,10 @@ mod tests { StructField::new("id", DataType::INTEGER, true), StructField::new("value", DataType::STRING, true), ])); + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let predicate = Arc::new(crate::expressions::Predicate::eq( Expr::column(["id"]), Expr::literal(10i32), @@ -852,10 +902,12 @@ mod tests { PhysicalPredicate::Some(_, s) => s.clone(), _ => panic!("Expected predicate"), }; - let processor = ScanLogReplayProcessor::new(&engine, state_info.clone()).unwrap(); + let processor = + ScanLogReplayProcessor::new(&engine, state_info.clone(), checkpoint_info.clone()) + .unwrap(); let deserialized = ScanLogReplayProcessor::from_serializable_state( &engine, - processor.into_serializable_state().unwrap(), + processor.into_serializable_state(checkpoint_info).unwrap(), ) .unwrap(); @@ -876,6 +928,10 @@ mod tests { StructField::new("value", DataType::INTEGER, true), StructField::new("date", DataType::DATE, true), ])); + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let state_info = Arc::new( get_state_info( schema, @@ -897,10 +953,12 @@ mod tests { ); let original_transform = state_info.transform_spec.clone(); assert!(original_transform.is_some()); - let processor = ScanLogReplayProcessor::new(&engine, state_info.clone()).unwrap(); + let processor = + ScanLogReplayProcessor::new(&engine, state_info.clone(), checkpoint_info.clone()) + .unwrap(); let deserialized = ScanLogReplayProcessor::from_serializable_state( &engine, - processor.into_serializable_state().unwrap(), + processor.into_serializable_state(checkpoint_info).unwrap(), ) .unwrap(); assert_eq!(deserialized.state_info.transform_spec, original_transform); @@ -915,6 +973,10 @@ mod tests { ColumnMappingMode::Id, ColumnMappingMode::Name, ] { + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new( "id", DataType::INTEGER, @@ -926,11 +988,13 @@ mod tests { physical_predicate: PhysicalPredicate::None, transform_spec: None, column_mapping_mode: mode, + stats_schema: None, }); - let processor = ScanLogReplayProcessor::new(&engine, state_info).unwrap(); + let processor = + ScanLogReplayProcessor::new(&engine, state_info, checkpoint_info.clone()).unwrap(); let deserialized = ScanLogReplayProcessor::from_serializable_state( &engine, - processor.into_serializable_state().unwrap(), + processor.into_serializable_state(checkpoint_info).unwrap(), ) .unwrap(); assert_eq!(deserialized.state_info.column_mapping_mode, mode); @@ -941,6 +1005,10 @@ mod tests { fn test_serialization_edge_cases() { // Test edge cases: empty seen_file_keys, no predicate, no transform_spec let engine = SyncEngine::new(); + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new( "id", DataType::INTEGER, @@ -952,9 +1020,11 @@ mod tests { physical_predicate: PhysicalPredicate::None, transform_spec: None, column_mapping_mode: ColumnMappingMode::None, + stats_schema: None, }); - let processor = ScanLogReplayProcessor::new(&engine, state_info).unwrap(); - let serialized = processor.into_serializable_state().unwrap(); + let processor = + ScanLogReplayProcessor::new(&engine, state_info, checkpoint_info.clone()).unwrap(); + let serialized = processor.into_serializable_state(checkpoint_info).unwrap(); assert!(serialized.predicate.is_none()); let deserialized = ScanLogReplayProcessor::from_serializable_state(&engine, serialized).unwrap(); @@ -966,10 +1036,15 @@ mod tests { fn test_serialization_invalid_json() { // Test that invalid JSON blobs are properly rejected let engine = SyncEngine::new(); + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let invalid_state = SerializableScanState { predicate: None, internal_state_blob: vec![0, 1, 2, 3, 255], // Invalid JSON seen_file_keys: HashSet::new(), + checkpoint_info, }; assert!(ScanLogReplayProcessor::from_serializable_state(&engine, invalid_state).is_err()); } @@ -978,6 +1053,10 @@ mod tests { fn test_serialization_missing_predicate_schema() { // Test that missing predicate_schema when predicate exists is detected let engine = SyncEngine::new(); + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new( "id", DataType::INTEGER, @@ -989,6 +1068,7 @@ mod tests { predicate_schema: None, // Missing! transform_spec: None, column_mapping_mode: ColumnMappingMode::None, + stats_schema: None, }; let predicate = Arc::new(crate::expressions::Predicate::column(["id"])); let invalid_blob = serde_json::to_vec(&invalid_internal_state).unwrap(); @@ -996,6 +1076,7 @@ mod tests { predicate: Some(predicate), // Predicate exists but schema is None internal_state_blob: invalid_blob, seen_file_keys: HashSet::new(), + checkpoint_info, }; let result = ScanLogReplayProcessor::from_serializable_state(&engine, invalid_state); assert!(result.is_err()); @@ -1017,6 +1098,7 @@ mod tests { predicate_schema: None, transform_spec: None, column_mapping_mode: ColumnMappingMode::None, + stats_schema: None, }; let blob = serde_json::to_string(&invalid_internal_state).unwrap(); let mut obj: serde_json::Value = serde_json::from_str(&blob).unwrap(); diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 05d9dde129..22b1c33322 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -20,7 +20,7 @@ use crate::expressions::{ColumnName, ExpressionRef, Predicate, PredicateRef, Sca use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, EmptyColumnResolver}; use crate::listed_log_files::ListedLogFilesBuilder; use crate::log_replay::{ActionsBatch, HasSelectionVector}; -use crate::log_segment::LogSegment; +use crate::log_segment::{ActionsWithCheckpointInfo, CheckpointReadInfo, LogSegment}; use crate::scan::log_replay::{BASE_ROW_ID_NAME, CLUSTERING_PROVIDER_NAME}; use crate::scan::state_info::StateInfo; use crate::schema::{ @@ -435,7 +435,8 @@ impl Scan { &self, engine: &dyn Engine, ) -> DeltaResult>> { - self.scan_metadata_inner(engine, self.replay_for_scan_metadata(engine)?) + let actions_with_checkpoint_info = self.replay_for_scan_metadata(engine)?; + self.scan_metadata_inner(engine, actions_with_checkpoint_info) } /// Get an updated iterator of [`ScanMetadata`]s based on an existing iterator of [`EngineData`]s. @@ -510,15 +511,25 @@ impl Scan { Ok(ActionsBatch::new(transform.evaluate(data.as_ref())?, false)) }; + let log_segment = self.snapshot.log_segment(); + // If the snapshot version corresponds to the hint version, we process the existing data // to apply file skipping and provide the required transformations. + // Since we're only processing existing data (no checkpoint), we use the base schema + // and no stats_parsed optimization. if existing_version == self.snapshot.version() { - let scan = existing_data.into_iter().map(apply_transform); - return Ok(Box::new(self.scan_metadata_inner(engine, scan)?)); + let actions_with_checkpoint_info = ActionsWithCheckpointInfo { + actions: existing_data.into_iter().map(apply_transform), + checkpoint_info: CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: CHECKPOINT_READ_SCHEMA.clone(), + }, + }; + return Ok(Box::new( + self.scan_metadata_inner(engine, actions_with_checkpoint_info)?, + )); } - let log_segment = self.snapshot.log_segment(); - // If the current log segment contains a checkpoint newer than the hint version // we disregard the existing data hint, and perform a full scan. The current log segment // only has deltas after the checkpoint, so we cannot update from prior versions. @@ -544,27 +555,44 @@ impl Scan { None, // No checkpoint in this incremental segment )?; - let it = new_log_segment - .read_actions_with_projected_checkpoint_actions( - engine, - COMMIT_READ_SCHEMA.clone(), - CHECKPOINT_READ_SCHEMA.clone(), - None, - )? - .chain(existing_data.into_iter().map(apply_transform)); + // For incremental reads, new_log_segment has no checkpoint but we use the + // checkpoint schema returned by the function for consistency. + let result = new_log_segment.read_actions_with_projected_checkpoint_actions( + engine, + COMMIT_READ_SCHEMA.clone(), + CHECKPOINT_READ_SCHEMA.clone(), + None, + self.state_info.stats_schema.as_ref().map(|s| s.as_ref()), + )?; + let actions_with_checkpoint_info = ActionsWithCheckpointInfo { + actions: result + .actions + .chain(existing_data.into_iter().map(apply_transform)), + checkpoint_info: result.checkpoint_info, + }; - Ok(Box::new(self.scan_metadata_inner(engine, it)?)) + Ok(Box::new(self.scan_metadata_inner( + engine, + actions_with_checkpoint_info, + )?)) } fn scan_metadata_inner( &self, engine: &dyn Engine, - action_batch_iter: impl Iterator>, + actions_with_checkpoint_info: ActionsWithCheckpointInfo< + impl Iterator>, + >, ) -> DeltaResult>> { if let PhysicalPredicate::StaticSkipAll = self.state_info.physical_predicate { return Ok(None.into_iter().flatten()); } - let it = scan_action_iter(engine, action_batch_iter, self.state_info.clone())?; + let it = scan_action_iter( + engine, + actions_with_checkpoint_info.actions, + self.state_info.clone(), + actions_with_checkpoint_info.checkpoint_info, + )?; Ok(Some(it).into_iter().flatten()) } @@ -572,7 +600,9 @@ impl Scan { fn replay_for_scan_metadata( &self, engine: &dyn Engine, - ) -> DeltaResult> + Send> { + ) -> DeltaResult< + ActionsWithCheckpointInfo> + Send>, + > { // NOTE: We don't pass any meta-predicate because we expect no meaningful row group skipping // when ~every checkpoint file will contain the adds and removes we are looking for. self.snapshot @@ -582,6 +612,7 @@ impl Scan { COMMIT_READ_SCHEMA.clone(), CHECKPOINT_READ_SCHEMA.clone(), None, + self.state_info.stats_schema.as_ref().map(|s| s.as_ref()), ) } diff --git a/kernel/src/scan/state_info.rs b/kernel/src/scan/state_info.rs index 303ec73d2c..9af8f9f9e2 100644 --- a/kernel/src/scan/state_info.rs +++ b/kernel/src/scan/state_info.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use tracing::debug; +use crate::scan::data_skipping::stats_schema::build_stats_schema; use crate::scan::field_classifiers::TransformFieldClassifier; use crate::scan::PhysicalPredicate; use crate::schema::{DataType, MetadataColumnSpec, SchemaRef, StructType}; @@ -27,6 +28,9 @@ pub(crate) struct StateInfo { pub(crate) transform_spec: Option>, /// The column mapping mode for this scan pub(crate) column_mapping_mode: ColumnMappingMode, + /// The stats schema for data skipping (built from predicate columns). + /// Used to construct checkpoint read schema with stats_parsed. + pub(crate) stats_schema: Option, } /// Validating the metadata columns also extracts information needed to properly construct the full @@ -203,6 +207,12 @@ impl StateInfo { None => PhysicalPredicate::None, }; + // Build stats schema from predicate columns for data skipping + let stats_schema = match &physical_predicate { + PhysicalPredicate::Some(_, schema) => build_stats_schema(schema), + _ => None, + }; + let transform_spec = if !transform_spec.is_empty() || column_mapping_mode != ColumnMappingMode::None { Some(Arc::new(transform_spec)) @@ -216,6 +226,7 @@ impl StateInfo { physical_predicate, transform_spec, column_mapping_mode, + stats_schema, }) } } diff --git a/kernel/src/scan/test_utils.rs b/kernel/src/scan/test_utils.rs index e11a605b85..9ced9cc717 100644 --- a/kernel/src/scan/test_utils.rs +++ b/kernel/src/scan/test_utils.rs @@ -6,8 +6,9 @@ use itertools::Itertools; use std::sync::Arc; use crate::log_replay::ActionsBatch; +use crate::log_segment::CheckpointReadInfo; use crate::{ - actions::get_commit_schema, + actions::{get_commit_schema, get_log_add_schema}, engine::{ arrow_data::ArrowEngineData, sync::{json::SyncJsonHandler, SyncEngine}, @@ -146,13 +147,19 @@ pub(crate) fn run_with_validate_callback( physical_predicate: PhysicalPredicate::None, transform_spec, column_mapping_mode: ColumnMappingMode::None, + stats_schema: None, }); + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let iter = scan_action_iter( &SyncEngine::new(), batch .into_iter() .map(|batch| Ok(ActionsBatch::new(batch as _, true))), state_info, + checkpoint_info, ) .unwrap(); let mut batch_count = 0; diff --git a/kernel/src/scan/tests.rs b/kernel/src/scan/tests.rs index 4f852e0dbc..4c615a1bfe 100644 --- a/kernel/src/scan/tests.rs +++ b/kernel/src/scan/tests.rs @@ -374,11 +374,8 @@ fn test_replay_for_scan_metadata() { let snapshot = Snapshot::builder_for(url).build(&engine).unwrap(); let scan = snapshot.scan_builder().build().unwrap(); - let data: Vec<_> = scan - .replay_for_scan_metadata(&engine) - .unwrap() - .try_collect() - .unwrap(); + let result = scan.replay_for_scan_metadata(&engine).unwrap(); + let data: Vec<_> = result.actions.try_collect().unwrap(); // No predicate pushdown attempted, because at most one part of a multi-part checkpoint // could be skipped when looking for adds/removes. // diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index 8332a854eb..a20f6676b7 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -13,6 +13,7 @@ use crate::actions::{ use crate::engine_data::{GetData, TypedGetData}; use crate::expressions::{column_name, ColumnName}; use crate::path::ParsedLogPath; +use crate::scan::data_skipping::stats_schema::build_stats_schema; use crate::scan::data_skipping::DataSkippingFilter; use crate::scan::state::DvInfo; use crate::schema::{ @@ -57,7 +58,20 @@ pub(crate) fn table_changes_action_iter( table_schema: SchemaRef, physical_predicate: Option<(PredicateRef, SchemaRef)>, ) -> DeltaResult>> { - let filter = DataSkippingFilter::new(engine.as_ref(), physical_predicate).map(Arc::new); + // For table_changes, we read commit files directly (not checkpoints), so there's no + // stats_parsed available. Build stats_schema from the predicate's referenced schema. + let (predicate, stats_schema) = match physical_predicate { + Some((pred, schema)) => (Some(pred), build_stats_schema(&schema)), + None => (None, None), + }; + let filter = DataSkippingFilter::new( + engine.as_ref(), + predicate, + stats_schema, + get_log_add_schema().clone(), + false, // has_compatible_stats_parsed + ) + .map(Arc::new); let mut current_configuration = start_table_configuration.clone(); let result = commit_files @@ -281,8 +295,9 @@ impl LogReplayScanner { // Apply data skipping to get back a selection vector for actions that passed skipping. // We start our selection vector based on what was filtered. We will add to this vector // below if a file has been removed. Note: None implies all files passed data skipping. + // Table changes always reads from commit files (is_log_batch = true), not checkpoints. let selection_vector = match &filter { - Some(filter) => filter.apply(actions.as_ref())?, + Some(filter) => filter.apply(actions.as_ref(), true /* is_log_batch */)?, None => vec![true; actions.len()], }; diff --git a/kernel/src/table_changes/physical_to_logical.rs b/kernel/src/table_changes/physical_to_logical.rs index 25aaed0d74..808e054630 100644 --- a/kernel/src/table_changes/physical_to_logical.rs +++ b/kernel/src/table_changes/physical_to_logical.rs @@ -183,6 +183,7 @@ mod tests { physical_predicate: PhysicalPredicate::None, transform_spec: Some(Arc::new(transform_spec)), column_mapping_mode: ColumnMappingMode::None, + stats_schema: None, } } @@ -401,6 +402,7 @@ mod tests { physical_predicate: PhysicalPredicate::None, transform_spec: Some(Arc::new(transform_spec)), column_mapping_mode: ColumnMappingMode::None, + stats_schema: None, }; let result = get_cdf_transform_expr(&scan_file, &state_info, &physical_schema); diff --git a/kernel/tests/data/parsed-stats/_delta_log/00000000000000000000.json b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000000.json new file mode 100644 index 0000000000..57a5d1011f --- /dev/null +++ b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"07b3a271-f0d6-4c3b-a300-3170847ceafe","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"salary\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpointInterval":"3"},"createdTime":1765928274753}} +{"add":{"path":"part-00000-06d85a38-b141-479b-a315-4157335e9a11-c000.snappy.parquet","partitionValues":{},"size":3103,"modificationTime":1765928276060,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"id\":1,\"name\":\"name_1\",\"age\":20,\"salary\":50100},\"maxValues\":{\"id\":100,\"name\":\"name_99\",\"age\":69,\"salary\":60000},\"nullCount\":{\"id\":0,\"name\":0,\"age\":0,\"salary\":0}}"}} +{"commitInfo":{"timestamp":1765928276979,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"100","numOutputBytes":"3103"},"engineInfo":"Apache-Spark/3.2.4 Delta-Lake/2.1.0","txnId":"e3e4034b-7a93-4d2b-9de4-3fda01735de4"}} diff --git a/kernel/tests/data/parsed-stats/_delta_log/00000000000000000001.json b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000001.json new file mode 100644 index 0000000000..5f2597c27f --- /dev/null +++ b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000001.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00000-065eae2b-b4ea-4708-bb30-0888f35cabdd-c000.snappy.parquet","partitionValues":{},"size":2826,"modificationTime":1765928279084,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"id\":101,\"name\":\"name_101\",\"age\":20,\"salary\":60100},\"maxValues\":{\"id\":200,\"name\":\"name_200\",\"age\":69,\"salary\":70000},\"nullCount\":{\"id\":0,\"name\":0,\"age\":0,\"salary\":0}}"}} +{"commitInfo":{"timestamp":1765928279091,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"100","numOutputBytes":"2826"},"engineInfo":"Apache-Spark/3.2.4 Delta-Lake/2.1.0","txnId":"6b5f28e4-a9cb-4c53-8b29-99e13d48a280"}} diff --git a/kernel/tests/data/parsed-stats/_delta_log/00000000000000000002.json b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000002.json new file mode 100644 index 0000000000..2829387d25 --- /dev/null +++ b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000002.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00000-a4c1def5-742e-4248-8c58-fc9f4018e43d-c000.snappy.parquet","partitionValues":{},"size":2828,"modificationTime":1765928279752,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"id\":201,\"name\":\"name_201\",\"age\":20,\"salary\":70100},\"maxValues\":{\"id\":300,\"name\":\"name_300\",\"age\":69,\"salary\":80000},\"nullCount\":{\"id\":0,\"name\":0,\"age\":0,\"salary\":0}}"}} +{"commitInfo":{"timestamp":1765928279759,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"100","numOutputBytes":"2828"},"engineInfo":"Apache-Spark/3.2.4 Delta-Lake/2.1.0","txnId":"23695a3f-264a-427d-a3f1-8e0eda4016f5"}} diff --git a/kernel/tests/data/parsed-stats/_delta_log/00000000000000000003.checkpoint.parquet b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000003.checkpoint.parquet new file mode 100644 index 0000000000..96e4924ab3 Binary files /dev/null and b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000003.checkpoint.parquet differ diff --git a/kernel/tests/data/parsed-stats/_delta_log/00000000000000000003.json b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000003.json new file mode 100644 index 0000000000..8fe41abeea --- /dev/null +++ b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000003.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00000-2d9663e0-37c0-425e-98df-2e7141f9b5fb-c000.snappy.parquet","partitionValues":{},"size":2827,"modificationTime":1765928280364,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"id\":301,\"name\":\"name_301\",\"age\":20,\"salary\":80100},\"maxValues\":{\"id\":400,\"name\":\"name_400\",\"age\":69,\"salary\":90000},\"nullCount\":{\"id\":0,\"name\":0,\"age\":0,\"salary\":0}}"}} +{"commitInfo":{"timestamp":1765928280369,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":2,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"100","numOutputBytes":"2827"},"engineInfo":"Apache-Spark/3.2.4 Delta-Lake/2.1.0","txnId":"88155a70-c671-4065-9a07-6e1eb3bbad68"}} diff --git a/kernel/tests/data/parsed-stats/_delta_log/00000000000000000004.json b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000004.json new file mode 100644 index 0000000000..7d87c00607 --- /dev/null +++ b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000004.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00000-40525115-50e1-4475-aae1-c8edc59274e6-c000.snappy.parquet","partitionValues":{},"size":2824,"modificationTime":1765928281356,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"id\":401,\"name\":\"name_401\",\"age\":20,\"salary\":90100},\"maxValues\":{\"id\":500,\"name\":\"name_500\",\"age\":69,\"salary\":100000},\"nullCount\":{\"id\":0,\"name\":0,\"age\":0,\"salary\":0}}"}} +{"commitInfo":{"timestamp":1765928281364,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":3,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"100","numOutputBytes":"2824"},"engineInfo":"Apache-Spark/3.2.4 Delta-Lake/2.1.0","txnId":"53ed470b-13c7-437f-829d-9a6704a285a0"}} diff --git a/kernel/tests/data/parsed-stats/_delta_log/00000000000000000005.json b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000005.json new file mode 100644 index 0000000000..97cec9fa13 --- /dev/null +++ b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000005.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00000-c0cbdedc-d11b-4e4c-b6f2-5f40c55ef515-c000.snappy.parquet","partitionValues":{},"size":2828,"modificationTime":1765928282040,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"id\":501,\"name\":\"name_501\",\"age\":20,\"salary\":100100},\"maxValues\":{\"id\":600,\"name\":\"name_600\",\"age\":69,\"salary\":110000},\"nullCount\":{\"id\":0,\"name\":0,\"age\":0,\"salary\":0}}"}} +{"commitInfo":{"timestamp":1765928282047,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":4,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"100","numOutputBytes":"2828"},"engineInfo":"Apache-Spark/3.2.4 Delta-Lake/2.1.0","txnId":"e0681515-134d-427b-b265-da87d06ec8c3"}} diff --git a/kernel/tests/data/parsed-stats/_delta_log/_last_checkpoint b/kernel/tests/data/parsed-stats/_delta_log/_last_checkpoint new file mode 100644 index 0000000000..136ec5b697 --- /dev/null +++ b/kernel/tests/data/parsed-stats/_delta_log/_last_checkpoint @@ -0,0 +1 @@ +{"version":3,"size":6,"sizeInBytes":12170,"numOfAddFiles":4,"checkpointSchema":{"type":"struct","fields":[{"name":"txn","type":{"type":"struct","fields":[{"name":"appId","type":"string","nullable":true,"metadata":{}},{"name":"version","type":"long","nullable":true,"metadata":{}},{"name":"lastUpdated","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"add","type":{"type":"struct","fields":[{"name":"path","type":"string","nullable":true,"metadata":{}},{"name":"partitionValues","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"size","type":"long","nullable":true,"metadata":{}},{"name":"modificationTime","type":"long","nullable":true,"metadata":{}},{"name":"dataChange","type":"boolean","nullable":true,"metadata":{}},{"name":"tags","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"stats","type":"string","nullable":true,"metadata":{}},{"name":"stats_parsed","type":{"type":"struct","fields":[{"name":"numRecords","type":"long","nullable":true,"metadata":{}},{"name":"minValues","type":{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"long","nullable":true,"metadata":{}},{"name":"salary","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"maxValues","type":{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"long","nullable":true,"metadata":{}},{"name":"salary","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"nullCount","type":{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"long","nullable":true,"metadata":{}},{"name":"age","type":"long","nullable":true,"metadata":{}},{"name":"salary","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"remove","type":{"type":"struct","fields":[{"name":"path","type":"string","nullable":true,"metadata":{}},{"name":"deletionTimestamp","type":"long","nullable":true,"metadata":{}},{"name":"dataChange","type":"boolean","nullable":true,"metadata":{}},{"name":"extendedFileMetadata","type":"boolean","nullable":true,"metadata":{}},{"name":"partitionValues","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"size","type":"long","nullable":true,"metadata":{}},{"name":"tags","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"metaData","type":{"type":"struct","fields":[{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"description","type":"string","nullable":true,"metadata":{}},{"name":"format","type":{"type":"struct","fields":[{"name":"provider","type":"string","nullable":true,"metadata":{}},{"name":"options","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"schemaString","type":"string","nullable":true,"metadata":{}},{"name":"partitionColumns","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"configuration","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"createdTime","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"protocol","type":{"type":"struct","fields":[{"name":"minReaderVersion","type":"integer","nullable":true,"metadata":{}},{"name":"minWriterVersion","type":"integer","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}} diff --git a/kernel/tests/data/parsed-stats/part-00000-065eae2b-b4ea-4708-bb30-0888f35cabdd-c000.snappy.parquet b/kernel/tests/data/parsed-stats/part-00000-065eae2b-b4ea-4708-bb30-0888f35cabdd-c000.snappy.parquet new file mode 100644 index 0000000000..b47546b961 Binary files /dev/null and b/kernel/tests/data/parsed-stats/part-00000-065eae2b-b4ea-4708-bb30-0888f35cabdd-c000.snappy.parquet differ diff --git a/kernel/tests/data/parsed-stats/part-00000-06d85a38-b141-479b-a315-4157335e9a11-c000.snappy.parquet b/kernel/tests/data/parsed-stats/part-00000-06d85a38-b141-479b-a315-4157335e9a11-c000.snappy.parquet new file mode 100644 index 0000000000..cc425f7519 Binary files /dev/null and b/kernel/tests/data/parsed-stats/part-00000-06d85a38-b141-479b-a315-4157335e9a11-c000.snappy.parquet differ diff --git a/kernel/tests/data/parsed-stats/part-00000-2d9663e0-37c0-425e-98df-2e7141f9b5fb-c000.snappy.parquet b/kernel/tests/data/parsed-stats/part-00000-2d9663e0-37c0-425e-98df-2e7141f9b5fb-c000.snappy.parquet new file mode 100644 index 0000000000..e25b8a16c1 Binary files /dev/null and b/kernel/tests/data/parsed-stats/part-00000-2d9663e0-37c0-425e-98df-2e7141f9b5fb-c000.snappy.parquet differ diff --git a/kernel/tests/data/parsed-stats/part-00000-40525115-50e1-4475-aae1-c8edc59274e6-c000.snappy.parquet b/kernel/tests/data/parsed-stats/part-00000-40525115-50e1-4475-aae1-c8edc59274e6-c000.snappy.parquet new file mode 100644 index 0000000000..d7ac169ee0 Binary files /dev/null and b/kernel/tests/data/parsed-stats/part-00000-40525115-50e1-4475-aae1-c8edc59274e6-c000.snappy.parquet differ diff --git a/kernel/tests/data/parsed-stats/part-00000-a4c1def5-742e-4248-8c58-fc9f4018e43d-c000.snappy.parquet b/kernel/tests/data/parsed-stats/part-00000-a4c1def5-742e-4248-8c58-fc9f4018e43d-c000.snappy.parquet new file mode 100644 index 0000000000..5199afdc9f Binary files /dev/null and b/kernel/tests/data/parsed-stats/part-00000-a4c1def5-742e-4248-8c58-fc9f4018e43d-c000.snappy.parquet differ diff --git a/kernel/tests/data/parsed-stats/part-00000-c0cbdedc-d11b-4e4c-b6f2-5f40c55ef515-c000.snappy.parquet b/kernel/tests/data/parsed-stats/part-00000-c0cbdedc-d11b-4e4c-b6f2-5f40c55ef515-c000.snappy.parquet new file mode 100644 index 0000000000..4cdf6bfc7e Binary files /dev/null and b/kernel/tests/data/parsed-stats/part-00000-c0cbdedc-d11b-4e4c-b6f2-5f40c55ef515-c000.snappy.parquet differ diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index ef1372713e..bfce625fa4 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -1701,3 +1701,91 @@ async fn test_invalid_files_are_skipped() -> Result<(), Box Result<(), Box> { + let _ = tracing_subscriber::fmt::try_init(); + let path = std::fs::canonicalize(PathBuf::from("./tests/data/parsed-stats/"))?; + let url = url::Url::from_directory_path(path).unwrap(); + let engine = test_utils::create_default_engine(&url)?; + + let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?; + + // Test 1: Predicate that should skip all files (id > 700) + // All files have max id of 600, so no files should match + let predicate = Pred::gt(column_expr!("id"), Expr::literal(700i64)); + let scan = snapshot + .clone() + .scan_builder() + .with_predicate(Arc::new(predicate)) + .build()?; + + let files_scanned: usize = scan.execute(engine.clone())?.count(); + assert_eq!( + files_scanned, 0, + "Expected 0 files when id > 700 (all files have max id 600)" + ); + + // Test 2: Predicate that should return only first file (id < 50) + // Only file 1 has ids 1-100 and min < 50 + let predicate = Pred::lt(column_expr!("id"), Expr::literal(50i64)); + let scan = snapshot + .clone() + .scan_builder() + .with_predicate(Arc::new(predicate)) + .build()?; + + let mut files_scanned = 0; + for _data in scan.execute(engine.clone())? { + files_scanned += 1; + } + assert_eq!( + files_scanned, 1, + "Expected 1 file when id < 50 (only file 1 has min id 1)" + ); + + // Test 3: Predicate using salary column (salary > 105000) + // Only file 6 has salary range 100100-110000, with max 110000 + let predicate = Pred::gt(column_expr!("salary"), Expr::literal(105000i64)); + let scan = snapshot + .clone() + .scan_builder() + .with_predicate(Arc::new(predicate)) + .build()?; + + let mut files_scanned = 0; + for _data in scan.execute(engine.clone())? { + files_scanned += 1; + } + assert_eq!( + files_scanned, 1, + "Expected 1 file when salary > 105000 (only file 6 has salary up to 110000)" + ); + + // Test 4: Predicate that matches multiple files (id > 350) + // Files 4, 5, 6 have ids starting from 301, 401, 501 + let predicate = Pred::gt(column_expr!("id"), Expr::literal(350i64)); + let scan = snapshot + .scan_builder() + .with_predicate(Arc::new(predicate)) + .build()?; + + let mut files_scanned = 0; + for _data in scan.execute(engine)? { + files_scanned += 1; + } + assert_eq!( + files_scanned, 3, + "Expected 3 files when id > 350 (files 4, 5, 6 have ids > 350)" + ); + + Ok(()) +}