From fdd657e13358f91d62242e86237cc6934994117f Mon Sep 17 00:00:00 2001 From: DrakeLin Date: Fri, 23 Jan 2026 06:10:20 +0000 Subject: [PATCH 1/3] has_compat --- kernel/src/log_segment.rs | 191 ++++++++++++++---- kernel/src/log_segment/tests.rs | 73 +++++-- kernel/src/scan/data_skipping/stats_schema.rs | 27 ++- kernel/src/scan/log_replay.rs | 11 +- kernel/src/scan/mod.rs | 26 ++- kernel/src/scan/state_info.rs | 11 + kernel/src/scan/test_utils.rs | 1 + kernel/src/scan/tests.rs | 7 +- .../src/table_changes/physical_to_logical.rs | 2 + 9 files changed, 267 insertions(+), 82 deletions(-) diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 0a4aca7a34..75b7476019 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -36,6 +36,32 @@ use url::Url; #[cfg(test)] mod tests; +/// Information about checkpoint reading for data skipping optimization. +/// +/// Returned alongside the actions iterator from checkpoint reading functions. +#[derive(Debug, Clone)] +pub(crate) struct CheckpointReadInfo { + /// Whether the checkpoint has compatible pre-parsed stats for data skipping. + /// When `true`, checkpoint batches can use stats_parsed directly instead of parsing JSON. + #[allow(unused)] + pub has_stats_parsed: bool, + /// The schema used to read checkpoint files, potentially including stats_parsed. + #[allow(unused)] + pub checkpoint_read_schema: SchemaRef, +} + +/// Result of reading actions from a log segment, containing both the actions iterator +/// and checkpoint metadata. +/// +/// This struct provides named access to the return values instead of tuple indexing. +pub(crate) struct ActionsWithCheckpointInfo { + /// Iterator over action batches read from the log segment. + pub actions: A, + /// Metadata about checkpoint reading, including the schema used. + #[allow(unused)] + pub checkpoint_info: CheckpointReadInfo, +} + /// A [`LogSegment`] represents a contiguous section of the log and is made of checkpoint files /// and commit files and guarantees the following: /// 1. Commit file versions will not have any gaps between them. @@ -384,6 +410,11 @@ impl LogSegment { /// /// `meta_predicate` is an optional expression to filter the log files with. It is _NOT_ the /// query's predicate, but rather a predicate for filtering log files themselves. + /// Read a stream of actions from this log segment. This returns an iterator of + /// [`ActionsBatch`]s which includes EngineData of actions + a boolean flag indicating whether + /// the data was read from a commit file (true) or a checkpoint file (false). + /// + /// Also returns `CheckpointReadInfo` with stats_parsed compatibility and the checkpoint schema. #[internal_api] pub(crate) fn read_actions_with_projected_checkpoint_actions( &self, @@ -391,14 +422,24 @@ impl LogSegment { commit_read_schema: SchemaRef, checkpoint_read_schema: SchemaRef, meta_predicate: Option, - ) -> DeltaResult> + Send> { + stats_schema: Option<&StructType>, + ) -> DeltaResult< + ActionsWithCheckpointInfo> + Send>, + > { // `replay` expects commit files to be sorted in descending order, so the return value here is correct let commit_stream = CommitReader::try_new(engine, self, commit_read_schema)?; - let checkpoint_stream = - self.create_checkpoint_stream(engine, checkpoint_read_schema, meta_predicate)?; + let (checkpoint_stream, checkpoint_info) = self.create_checkpoint_stream( + engine, + checkpoint_read_schema, + meta_predicate, + stats_schema, + )?; - Ok(commit_stream.chain(checkpoint_stream)) + Ok(ActionsWithCheckpointInfo { + actions: commit_stream.chain(checkpoint_stream), + checkpoint_info, + }) } // Same as above, but uses the same schema for reading checkpoints and commits. @@ -409,12 +450,14 @@ impl LogSegment { action_schema: SchemaRef, meta_predicate: Option, ) -> DeltaResult> + Send> { - self.read_actions_with_projected_checkpoint_actions( + let result = self.read_actions_with_projected_checkpoint_actions( engine, action_schema.clone(), action_schema, meta_predicate, - ) + None, + )?; + Ok(result.actions) } /// find a minimal set to cover the range of commits we want. This is greedy so not always @@ -484,7 +527,8 @@ impl LogSegment { _ => return Ok((None, vec![])), }; - // Cached hint schema for determining V1 vs V2 without footer read + // Cached hint schema for determining V1 vs V2 without footer read. + // hint_schema is Option<&SchemaRef> where SchemaRef = Arc. let hint_schema = self.checkpoint_schema.as_ref(); match checkpoint.extension.as_str() { @@ -514,12 +558,14 @@ impl LogSegment { Some(true) => { // Hint says V2 checkpoint, extract sidecars let sidecar_files = self.extract_sidecar_refs(engine, checkpoint)?; - // For V2, read first sidecar's schema + // For V2, read first sidecar's schema if sidecars exist. + // If no sidecars, V2 checkpoint may still have add actions in main file + // (like V1), so fall back to hint schema for stats_parsed check. let file_actions_schema = match sidecar_files.first() { Some(first) => { Some(engine.parquet_handler().read_parquet_footer(first)?.schema) } - None => None, + None => hint_schema.cloned(), }; Ok((file_actions_schema, sidecar_files)) } @@ -532,11 +578,14 @@ impl LogSegment { if footer.schema.field(SIDECAR_NAME).is_some() { // V2 parquet checkpoint let sidecar_files = self.extract_sidecar_refs(engine, checkpoint)?; + // For V2, read first sidecar's schema if sidecars exist. + // If no sidecars, V2 checkpoint may still have add actions in main file + // (like V1), so fall back to footer schema for stats_parsed check. let file_actions_schema = match sidecar_files.first() { Some(first) => Some( engine.parquet_handler().read_parquet_footer(first)?.schema, ), - None => None, + None => Some(footer.schema), }; Ok((file_actions_schema, sidecar_files)) } else { @@ -556,12 +605,18 @@ impl LogSegment { /// 1. Determines the files actions schema (for future stats_parsed detection) /// 2. Extracts sidecar file references if present (V2 checkpoints) /// 3. Reads checkpoint and sidecar data using cached sidecar refs + /// + /// Returns a tuple of the actions iterator and [`CheckpointReadInfo`]. fn create_checkpoint_stream( &self, engine: &dyn Engine, action_schema: SchemaRef, meta_predicate: Option, - ) -> DeltaResult> + Send> { + stats_schema: Option<&StructType>, + ) -> DeltaResult<( + impl Iterator> + Send, + CheckpointReadInfo, + )> { let need_file_actions = schema_contains_file_actions(&action_schema); // Extract file actions schema and sidecar files @@ -574,22 +629,61 @@ impl LogSegment { (None, vec![]) }; - // (Future) Determine if there are usable parsed stats - // let _has_stats_parsed = file_actions_schema.as_ref() - // .map(|s| Self::schema_has_compatible_stats_parsed(s, stats_schema)) - // .unwrap_or(false); - let _ = file_actions_schema; // Suppress unused warning for now - - // Read the actual checkpoint files, using cached sidecar files - // We expand sidecars if we have them and need file actions - let checkpoint_read_schema = if need_file_actions - && !sidecar_files.is_empty() - && !action_schema.contains(SIDECAR_NAME) - { - Arc::new( - action_schema.add([StructField::nullable(SIDECAR_NAME, Sidecar::to_schema())])?, + // Check if checkpoint has compatible stats_parsed and add it to the schema if so + let has_stats_parsed = + stats_schema + .zip(file_actions_schema.as_ref()) + .is_some_and(|(stats, file_schema)| { + Self::schema_has_compatible_stats_parsed(file_schema, stats) + }); + + // Build final schema with any additional fields needed (stats_parsed, sidecar) + // Only modify the schema if it has an "add" field (i.e., we need file actions) + let augmented_checkpoint_read_schema = if let Some(add_field) = action_schema.field("add") { + let DataType::Struct(add_struct) = add_field.data_type() else { + return Err(Error::internal_error( + "add field in action schema must be a struct", + )); + }; + let mut add_fields: Vec = add_struct.fields().cloned().collect(); + + // Add stats_parsed if checkpoint has compatible parsed stats + if let (true, Some(stats_schema)) = (has_stats_parsed, stats_schema) { + add_fields.push(StructField::nullable( + "stats_parsed", + DataType::Struct(Box::new(stats_schema.clone())), + )); + } + + // Rebuild the add field with any new fields (stats_parsed) + let new_add_field = StructField::new( + add_field.name(), + StructType::new_unchecked(add_fields), + add_field.is_nullable(), ) + .with_metadata(add_field.metadata.clone()); + + // Rebuild schema with modified add field + let mut new_fields: Vec = action_schema + .fields() + .map(|f| { + if f.name() == "add" { + new_add_field.clone() + } else { + f.clone() + } + }) + .collect(); + + // Add sidecar column at top-level for V2 checkpoints + if need_file_actions && !sidecar_files.is_empty() { + new_fields.push(StructField::nullable(SIDECAR_NAME, Sidecar::to_schema())); + } + + Arc::new(StructType::new_unchecked(new_fields)) } else { + // Schema doesn't have "add" field (e.g., for metadata/protocol only reads) + // Use the action_schema as-is action_schema.clone() }; @@ -609,14 +703,14 @@ impl LogSegment { Some(parsed_log_path) if parsed_log_path.extension == "json" => { engine.json_handler().read_json_files( &checkpoint_file_meta, - checkpoint_read_schema.clone(), + augmented_checkpoint_read_schema.clone(), meta_predicate.clone(), )? } Some(parsed_log_path) if parsed_log_path.extension == "parquet" => parquet_handler .read_parquet_files( &checkpoint_file_meta, - checkpoint_read_schema.clone(), + augmented_checkpoint_read_schema.clone(), meta_predicate.clone(), )?, Some(parsed_log_path) => { @@ -644,7 +738,11 @@ impl LogSegment { .map_ok(|batch| ActionsBatch::new(batch, false)) .chain(sidecar_batches.map_ok(|batch| ActionsBatch::new(batch, false))); - Ok(actions_iter) + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed, + checkpoint_read_schema: augmented_checkpoint_read_schema, + }; + Ok((actions_iter, checkpoint_info)) } /// Extracts sidecar file references from a checkpoint file. @@ -804,8 +902,7 @@ impl LogSegment { /// use physical column names (not logical names), so direct name comparison is correct. /// /// Returns `false` if stats_parsed doesn't exist or has incompatible types. - #[allow(dead_code)] - fn schema_has_compatible_stats_parsed( + pub(crate) fn schema_has_compatible_stats_parsed( checkpoint_schema: &StructType, stats_schema: &StructType, ) -> bool { @@ -833,42 +930,52 @@ impl LogSegment { // While these typically have the same schema, the protocol doesn't guarantee it, // so we check both to be safe. for field_name in ["minValues", "maxValues"] { - let Some(values_field) = stats_struct.field(field_name) else { + let Some(checkpoint_values_field) = stats_struct.field(field_name) else { // stats_parsed exists but no minValues/maxValues - unusual but valid continue; }; // minValues/maxValues must be a Struct containing per-column statistics. // If it exists but isn't a Struct, the schema is malformed and unusable. - let DataType::Struct(values_struct) = values_field.data_type() else { + let DataType::Struct(checkpoint_values) = checkpoint_values_field.data_type() else { debug!( - "stats_parsed not compatible: stats_parsed.{} is not a Struct, got {:?}", + "stats_parsed not compatible: stats_parsed. {} is not a Struct, got {:?}", field_name, - values_field.data_type() + checkpoint_values_field.data_type() ); return false; }; - // Check type compatibility for each column in the checkpoint's stats_parsed - // that also exists in the stats schema (columns needed for data skipping) - for checkpoint_field in values_struct.fields() { - if let Some(stats_field) = stats_schema.field(&checkpoint_field.name) { + // Get the corresponding field from stats_schema (e.g., stats_schema.minValues) + let Some(stats_values_field) = stats_schema.field(field_name) else { + // stats_schema doesn't have minValues/maxValues, skip this check + continue; + }; + let DataType::Struct(stats_values) = stats_values_field.data_type() else { + // stats_schema.minValues/maxValues isn't a struct - shouldn't happen but skip + continue; + }; + + // Check type compatibility for each column needed for data skipping + // If it exists in checkpoint, verify types are compatible + for stats_field in stats_values.fields() { + if let Some(checkpoint_field) = checkpoint_values.field(&stats_field.name) { if checkpoint_field .data_type() .can_read_as(stats_field.data_type()) .is_err() { debug!( - "stats_parsed not compatible: incompatible type for column '{}' in {}: checkpoint has {:?}, stats schema has {:?}", - checkpoint_field.name, - field_name, + "stats_parsed not compatible: incompatible type for column '{}' in {field_name}: checkpoint has {:?}, stats schema needs {:?}", + stats_field.name, checkpoint_field.data_type(), stats_field.data_type() ); return false; } } - // If column doesn't exist in stats schema, it's fine (not needed for data skipping) + // If the column is missing from checkpoint's stats_parsed, it will return + // null when accessed, which is acceptable for data skipping. } } diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index 3d39933cea..7dffa366d9 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -1170,8 +1170,12 @@ async fn test_create_checkpoint_stream_returns_checkpoint_batches_as_is_if_schem None, None, )?; - let mut iter = - log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?; + let (mut iter, _checkpoint_info) = log_segment.create_checkpoint_stream( + &engine, + v2_checkpoint_read_schema.clone(), + None, + None, + )?; // Assert that the first batch returned is from reading checkpoint file 1 let ActionsBatch { @@ -1235,8 +1239,12 @@ async fn test_create_checkpoint_stream_returns_checkpoint_batches_if_checkpoint_ None, None, )?; - let mut iter = - log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?; + let (mut iter, _checkpoint_info) = log_segment.create_checkpoint_stream( + &engine, + v2_checkpoint_read_schema.clone(), + None, + None, + )?; // Assert the correctness of batches returned for expected_sidecar in ["sidecar1.parquet", "sidecar2.parquet"].iter() { @@ -1295,8 +1303,12 @@ async fn test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_si None, None, )?; - let mut iter = - log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?; + let (mut iter, _checkpoint_info) = log_segment.create_checkpoint_stream( + &engine, + v2_checkpoint_read_schema.clone(), + None, + None, + )?; // Assert that the first batch returned is from reading checkpoint file 1 let ActionsBatch { @@ -1344,8 +1356,8 @@ async fn test_create_checkpoint_stream_reads_json_checkpoint_batch_without_sidec None, None, )?; - let mut iter = - log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema, None)?; + let (mut iter, _checkpoint_info) = + log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema, None, None)?; // Assert that the first batch returned is from reading checkpoint file 1 let ActionsBatch { @@ -1431,8 +1443,12 @@ async fn test_create_checkpoint_stream_reads_checkpoint_file_and_returns_sidecar None, None, )?; - let mut iter = - log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?; + let (mut iter, _checkpoint_info) = log_segment.create_checkpoint_stream( + &engine, + v2_checkpoint_read_schema.clone(), + None, + None, + )?; // Assert that the first batch returned is from reading checkpoint file 1 let ActionsBatch { @@ -2709,6 +2725,18 @@ fn create_checkpoint_schema_with_stats_parsed(min_values_fields: Vec) -> StructType { + StructType::new_unchecked([ + StructField::nullable("numRecords", DataType::LONG), + StructField::nullable( + "minValues", + StructType::new_unchecked(column_fields.clone()), + ), + StructField::nullable("maxValues", StructType::new_unchecked(column_fields)), + ]) +} + // Helper to create a checkpoint schema without stats_parsed fn create_checkpoint_schema_without_stats_parsed() -> StructType { use crate::schema::StructType; @@ -2731,7 +2759,7 @@ fn test_schema_has_compatible_stats_parsed_basic() { )]); // Exact type match should work - let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]); + let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]); assert!(LogSegment::schema_has_compatible_stats_parsed( &checkpoint_schema, &stats_schema @@ -2739,7 +2767,7 @@ fn test_schema_has_compatible_stats_parsed_basic() { // Type widening (int -> long) should work let stats_schema_widened = - StructType::new_unchecked([StructField::nullable("id", DataType::LONG)]); + create_stats_schema(vec![StructField::nullable("id", DataType::LONG)]); assert!(LogSegment::schema_has_compatible_stats_parsed( &checkpoint_schema, &stats_schema_widened @@ -2759,17 +2787,18 @@ fn test_schema_has_compatible_stats_parsed_basic() { #[test] fn test_schema_has_compatible_stats_parsed_missing_column_ok() { - // Checkpoint has "id" column, stats schema needs "other" column (missing in checkpoint is OK) + // Checkpoint has "id" column, stats schema needs "other" column + // Missing column is acceptable - it will return null when accessed let checkpoint_schema = create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable( "id", DataType::INTEGER, )]); - let stats_schema = - StructType::new_unchecked([StructField::nullable("other", DataType::INTEGER)]); + let stats_schema = create_stats_schema(vec![StructField::nullable("other", DataType::INTEGER)]); - // Missing column in checkpoint is OK - it will just return NULL + // Missing column in checkpoint is OK - it will return null when accessed, + // which is acceptable for data skipping (just means we can't skip based on that column) assert!(LogSegment::schema_has_compatible_stats_parsed( &checkpoint_schema, &stats_schema @@ -2784,7 +2813,7 @@ fn test_schema_has_compatible_stats_parsed_extra_column_ok() { StructField::nullable("extra", DataType::STRING), ]); - let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]); + let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]); assert!(LogSegment::schema_has_compatible_stats_parsed( &checkpoint_schema, @@ -2797,7 +2826,7 @@ fn test_schema_has_compatible_stats_parsed_no_stats_parsed() { // Checkpoint schema without stats_parsed field let checkpoint_schema = create_checkpoint_schema_without_stats_parsed(); - let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]); + let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]); assert!(!LogSegment::schema_has_compatible_stats_parsed( &checkpoint_schema, @@ -2814,7 +2843,7 @@ fn test_schema_has_compatible_stats_parsed_empty_stats_schema() { DataType::INTEGER, )]); - let stats_schema = StructType::new_unchecked([]); + let stats_schema = create_stats_schema(vec![]); // If no columns are needed for data skipping, any stats_parsed is compatible assert!(LogSegment::schema_has_compatible_stats_parsed( @@ -2832,7 +2861,7 @@ fn test_schema_has_compatible_stats_parsed_multiple_columns() { ]); // First column matches, second is incompatible - let stats_schema = StructType::new_unchecked([ + let stats_schema = create_stats_schema(vec![ StructField::nullable("good_col", DataType::LONG), StructField::nullable("bad_col", DataType::INTEGER), ]); @@ -2858,7 +2887,7 @@ fn test_schema_has_compatible_stats_parsed_missing_min_max_values() { let checkpoint_schema = StructType::new_unchecked([StructField::nullable("add", add_schema)]); - let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]); + let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]); // Should return true - missing minValues/maxValues is handled gracefully with continue assert!(LogSegment::schema_has_compatible_stats_parsed( @@ -2884,7 +2913,7 @@ fn test_schema_has_compatible_stats_parsed_min_values_not_struct() { let checkpoint_schema = StructType::new_unchecked([StructField::nullable("add", add_schema)]); - let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]); + let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]); // Should return false - minValues/maxValues must be Struct types assert!(!LogSegment::schema_has_compatible_stats_parsed( diff --git a/kernel/src/scan/data_skipping/stats_schema.rs b/kernel/src/scan/data_skipping/stats_schema.rs index e18423b467..c51eacef88 100644 --- a/kernel/src/scan/data_skipping/stats_schema.rs +++ b/kernel/src/scan/data_skipping/stats_schema.rs @@ -1,11 +1,12 @@ //! This module contains logic to compute the expected schema for file statistics use std::borrow::Cow; +use std::sync::Arc; use crate::{ schema::{ - ArrayType, ColumnName, DataType, MapType, PrimitiveType, Schema, SchemaTransform, - StructField, StructType, + ArrayType, ColumnName, DataType, MapType, PrimitiveType, Schema, SchemaRef, + SchemaTransform, StructField, StructType, }, table_properties::{DataSkippingNumIndexedCols, TableProperties}, DeltaResult, @@ -133,6 +134,28 @@ pub(crate) fn expected_stats_schema( StructType::try_new(fields) } +/// Creates a stats schema from a referenced schema (columns from predicate). +/// Returns schema: `{ numRecords, nullCount, minValues, maxValues }` +/// +/// This is used to build the schema for parsing JSON stats and for reading stats_parsed +/// from checkpoints. +pub(crate) fn build_stats_schema(referenced_schema: &StructType) -> Option { + let stats_schema = NullableStatsTransform + .transform_struct(referenced_schema)? + .into_owned(); + + let nullcount_schema = NullCountStatsTransform + .transform_struct(&stats_schema)? + .into_owned(); + + Some(Arc::new(StructType::new_unchecked([ + StructField::nullable("numRecords", DataType::LONG), + StructField::nullable("nullCount", nullcount_schema), + StructField::nullable("minValues", stats_schema.clone()), + StructField::nullable("maxValues", stats_schema), + ]))) +} + /// Transforms a schema to make all fields nullable. /// Used for stats schemas where stats may not be available for all columns. pub(crate) struct NullableStatsTransform; diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 23ffb1d7f0..6b919d0730 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -16,7 +16,7 @@ use crate::log_replay::deduplicator::Deduplicator; use crate::log_replay::{ActionsBatch, FileActionDeduplicator, FileActionKey, LogReplayProcessor}; use crate::scan::Scalar; use crate::schema::ToSchema as _; -use crate::schema::{ColumnNamesAndTypes, DataType, MapType, StructField, StructType}; +use crate::schema::{ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType}; use crate::table_features::ColumnMappingMode; use crate::transforms::{get_transform_expr, parse_partition_values, TransformSpec}; use crate::utils::require; @@ -32,6 +32,7 @@ struct InternalScanState { predicate_schema: Option>, transform_spec: Option>, column_mapping_mode: ColumnMappingMode, + stats_schema: Option, } /// Public-facing serialized processor state for distributed processing. @@ -162,6 +163,7 @@ impl ScanLogReplayProcessor { physical_predicate, transform_spec, column_mapping_mode, + stats_schema, } = self.state_info.as_ref().clone(); // Extract predicate from PhysicalPredicate @@ -177,6 +179,7 @@ impl ScanLogReplayProcessor { transform_spec, predicate_schema, column_mapping_mode, + stats_schema, }; let internal_state_blob = serde_json::to_vec(&internal_state) .map_err(|e| Error::generic(format!("Failed to serialize internal state: {}", e)))?; @@ -232,6 +235,7 @@ impl ScanLogReplayProcessor { physical_predicate, transform_spec: internal_state.transform_spec, column_mapping_mode: internal_state.column_mapping_mode, + stats_schema: internal_state.stats_schema, }); let processor = Self::new_with_seen_files(engine, state_info, state.seen_file_keys)?; @@ -634,6 +638,7 @@ mod tests { physical_predicate: PhysicalPredicate::None, transform_spec: None, column_mapping_mode: ColumnMappingMode::None, + stats_schema: None, }); let iter = scan_action_iter( &SyncEngine::new(), @@ -926,6 +931,7 @@ mod tests { physical_predicate: PhysicalPredicate::None, transform_spec: None, column_mapping_mode: mode, + stats_schema: None, }); let processor = ScanLogReplayProcessor::new(&engine, state_info).unwrap(); let deserialized = ScanLogReplayProcessor::from_serializable_state( @@ -952,6 +958,7 @@ mod tests { physical_predicate: PhysicalPredicate::None, transform_spec: None, column_mapping_mode: ColumnMappingMode::None, + stats_schema: None, }); let processor = ScanLogReplayProcessor::new(&engine, state_info).unwrap(); let serialized = processor.into_serializable_state().unwrap(); @@ -989,6 +996,7 @@ mod tests { predicate_schema: None, // Missing! transform_spec: None, column_mapping_mode: ColumnMappingMode::None, + stats_schema: None, }; let predicate = Arc::new(crate::expressions::Predicate::column(["id"])); let invalid_blob = serde_json::to_vec(&invalid_internal_state).unwrap(); @@ -1017,6 +1025,7 @@ mod tests { predicate_schema: None, transform_spec: None, column_mapping_mode: ColumnMappingMode::None, + stats_schema: None, }; let blob = serde_json::to_string(&invalid_internal_state).unwrap(); let mut obj: serde_json::Value = serde_json::from_str(&blob).unwrap(); diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 05d9dde129..c3cea58d75 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -20,7 +20,7 @@ use crate::expressions::{ColumnName, ExpressionRef, Predicate, PredicateRef, Sca use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, EmptyColumnResolver}; use crate::listed_log_files::ListedLogFilesBuilder; use crate::log_replay::{ActionsBatch, HasSelectionVector}; -use crate::log_segment::LogSegment; +use crate::log_segment::{ActionsWithCheckpointInfo, LogSegment}; use crate::scan::log_replay::{BASE_ROW_ID_NAME, CLUSTERING_PROVIDER_NAME}; use crate::scan::state_info::StateInfo; use crate::schema::{ @@ -435,7 +435,8 @@ impl Scan { &self, engine: &dyn Engine, ) -> DeltaResult>> { - self.scan_metadata_inner(engine, self.replay_for_scan_metadata(engine)?) + let result = self.replay_for_scan_metadata(engine)?; + self.scan_metadata_inner(engine, result.actions) } /// Get an updated iterator of [`ScanMetadata`]s based on an existing iterator of [`EngineData`]s. @@ -544,13 +545,15 @@ impl Scan { None, // No checkpoint in this incremental segment )?; - let it = new_log_segment - .read_actions_with_projected_checkpoint_actions( - engine, - COMMIT_READ_SCHEMA.clone(), - CHECKPOINT_READ_SCHEMA.clone(), - None, - )? + let result = new_log_segment.read_actions_with_projected_checkpoint_actions( + engine, + COMMIT_READ_SCHEMA.clone(), + CHECKPOINT_READ_SCHEMA.clone(), + None, + None, + )?; + let it = result + .actions .chain(existing_data.into_iter().map(apply_transform)); Ok(Box::new(self.scan_metadata_inner(engine, it)?)) @@ -572,7 +575,9 @@ impl Scan { fn replay_for_scan_metadata( &self, engine: &dyn Engine, - ) -> DeltaResult> + Send> { + ) -> DeltaResult< + ActionsWithCheckpointInfo> + Send>, + > { // NOTE: We don't pass any meta-predicate because we expect no meaningful row group skipping // when ~every checkpoint file will contain the adds and removes we are looking for. self.snapshot @@ -582,6 +587,7 @@ impl Scan { COMMIT_READ_SCHEMA.clone(), CHECKPOINT_READ_SCHEMA.clone(), None, + self.state_info.stats_schema.as_ref().map(|s| s.as_ref()), ) } diff --git a/kernel/src/scan/state_info.rs b/kernel/src/scan/state_info.rs index 303ec73d2c..9af8f9f9e2 100644 --- a/kernel/src/scan/state_info.rs +++ b/kernel/src/scan/state_info.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use tracing::debug; +use crate::scan::data_skipping::stats_schema::build_stats_schema; use crate::scan::field_classifiers::TransformFieldClassifier; use crate::scan::PhysicalPredicate; use crate::schema::{DataType, MetadataColumnSpec, SchemaRef, StructType}; @@ -27,6 +28,9 @@ pub(crate) struct StateInfo { pub(crate) transform_spec: Option>, /// The column mapping mode for this scan pub(crate) column_mapping_mode: ColumnMappingMode, + /// The stats schema for data skipping (built from predicate columns). + /// Used to construct checkpoint read schema with stats_parsed. + pub(crate) stats_schema: Option, } /// Validating the metadata columns also extracts information needed to properly construct the full @@ -203,6 +207,12 @@ impl StateInfo { None => PhysicalPredicate::None, }; + // Build stats schema from predicate columns for data skipping + let stats_schema = match &physical_predicate { + PhysicalPredicate::Some(_, schema) => build_stats_schema(schema), + _ => None, + }; + let transform_spec = if !transform_spec.is_empty() || column_mapping_mode != ColumnMappingMode::None { Some(Arc::new(transform_spec)) @@ -216,6 +226,7 @@ impl StateInfo { physical_predicate, transform_spec, column_mapping_mode, + stats_schema, }) } } diff --git a/kernel/src/scan/test_utils.rs b/kernel/src/scan/test_utils.rs index e11a605b85..cd340fb71a 100644 --- a/kernel/src/scan/test_utils.rs +++ b/kernel/src/scan/test_utils.rs @@ -146,6 +146,7 @@ pub(crate) fn run_with_validate_callback( physical_predicate: PhysicalPredicate::None, transform_spec, column_mapping_mode: ColumnMappingMode::None, + stats_schema: None, }); let iter = scan_action_iter( &SyncEngine::new(), diff --git a/kernel/src/scan/tests.rs b/kernel/src/scan/tests.rs index 4f852e0dbc..4c615a1bfe 100644 --- a/kernel/src/scan/tests.rs +++ b/kernel/src/scan/tests.rs @@ -374,11 +374,8 @@ fn test_replay_for_scan_metadata() { let snapshot = Snapshot::builder_for(url).build(&engine).unwrap(); let scan = snapshot.scan_builder().build().unwrap(); - let data: Vec<_> = scan - .replay_for_scan_metadata(&engine) - .unwrap() - .try_collect() - .unwrap(); + let result = scan.replay_for_scan_metadata(&engine).unwrap(); + let data: Vec<_> = result.actions.try_collect().unwrap(); // No predicate pushdown attempted, because at most one part of a multi-part checkpoint // could be skipped when looking for adds/removes. // diff --git a/kernel/src/table_changes/physical_to_logical.rs b/kernel/src/table_changes/physical_to_logical.rs index 25aaed0d74..808e054630 100644 --- a/kernel/src/table_changes/physical_to_logical.rs +++ b/kernel/src/table_changes/physical_to_logical.rs @@ -183,6 +183,7 @@ mod tests { physical_predicate: PhysicalPredicate::None, transform_spec: Some(Arc::new(transform_spec)), column_mapping_mode: ColumnMappingMode::None, + stats_schema: None, } } @@ -401,6 +402,7 @@ mod tests { physical_predicate: PhysicalPredicate::None, transform_spec: Some(Arc::new(transform_spec)), column_mapping_mode: ColumnMappingMode::None, + stats_schema: None, }; let result = get_cdf_transform_expr(&scan_file, &state_info, &physical_schema); From 0edcfae1dc1402fbe000f650e20aab86912b1622 Mon Sep 17 00:00:00 2001 From: DrakeLin Date: Fri, 23 Jan 2026 21:24:45 +0000 Subject: [PATCH 2/3] nits --- kernel/src/log_segment.rs | 20 +++++++++++--------- kernel/src/log_segment/tests.rs | 15 ++++++++++----- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 75b7476019..2d2540c6b4 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -54,7 +54,7 @@ pub(crate) struct CheckpointReadInfo { /// and checkpoint metadata. /// /// This struct provides named access to the return values instead of tuple indexing. -pub(crate) struct ActionsWithCheckpointInfo { +pub(crate) struct ActionsWithCheckpointInfo> + Send> { /// Iterator over action batches read from the log segment. pub actions: A, /// Metadata about checkpoint reading, including the schema used. @@ -429,7 +429,7 @@ impl LogSegment { // `replay` expects commit files to be sorted in descending order, so the return value here is correct let commit_stream = CommitReader::try_new(engine, self, commit_read_schema)?; - let (checkpoint_stream, checkpoint_info) = self.create_checkpoint_stream( + let checkpoint_result = self.create_checkpoint_stream( engine, checkpoint_read_schema, meta_predicate, @@ -437,8 +437,8 @@ impl LogSegment { )?; Ok(ActionsWithCheckpointInfo { - actions: commit_stream.chain(checkpoint_stream), - checkpoint_info, + actions: commit_stream.chain(checkpoint_result.actions), + checkpoint_info: checkpoint_result.checkpoint_info, }) } @@ -613,10 +613,9 @@ impl LogSegment { action_schema: SchemaRef, meta_predicate: Option, stats_schema: Option<&StructType>, - ) -> DeltaResult<( - impl Iterator> + Send, - CheckpointReadInfo, - )> { + ) -> DeltaResult< + ActionsWithCheckpointInfo> + Send>, + > { let need_file_actions = schema_contains_file_actions(&action_schema); // Extract file actions schema and sidecar files @@ -742,7 +741,10 @@ impl LogSegment { has_stats_parsed, checkpoint_read_schema: augmented_checkpoint_read_schema, }; - Ok((actions_iter, checkpoint_info)) + Ok(ActionsWithCheckpointInfo { + actions: actions_iter, + checkpoint_info, + }) } /// Extracts sidecar file references from a checkpoint file. diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index 7dffa366d9..6fd58f7de2 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -1170,12 +1170,13 @@ async fn test_create_checkpoint_stream_returns_checkpoint_batches_as_is_if_schem None, None, )?; - let (mut iter, _checkpoint_info) = log_segment.create_checkpoint_stream( + let checkpoint_result = log_segment.create_checkpoint_stream( &engine, v2_checkpoint_read_schema.clone(), None, None, )?; + let mut iter = checkpoint_result.actions; // Assert that the first batch returned is from reading checkpoint file 1 let ActionsBatch { @@ -1239,12 +1240,13 @@ async fn test_create_checkpoint_stream_returns_checkpoint_batches_if_checkpoint_ None, None, )?; - let (mut iter, _checkpoint_info) = log_segment.create_checkpoint_stream( + let checkpoint_result = log_segment.create_checkpoint_stream( &engine, v2_checkpoint_read_schema.clone(), None, None, )?; + let mut iter = checkpoint_result.actions; // Assert the correctness of batches returned for expected_sidecar in ["sidecar1.parquet", "sidecar2.parquet"].iter() { @@ -1303,12 +1305,13 @@ async fn test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_si None, None, )?; - let (mut iter, _checkpoint_info) = log_segment.create_checkpoint_stream( + let checkpoint_result = log_segment.create_checkpoint_stream( &engine, v2_checkpoint_read_schema.clone(), None, None, )?; + let mut iter = checkpoint_result.actions; // Assert that the first batch returned is from reading checkpoint file 1 let ActionsBatch { @@ -1356,8 +1359,9 @@ async fn test_create_checkpoint_stream_reads_json_checkpoint_batch_without_sidec None, None, )?; - let (mut iter, _checkpoint_info) = + let checkpoint_result = log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema, None, None)?; + let mut iter = checkpoint_result.actions; // Assert that the first batch returned is from reading checkpoint file 1 let ActionsBatch { @@ -1443,12 +1447,13 @@ async fn test_create_checkpoint_stream_reads_checkpoint_file_and_returns_sidecar None, None, )?; - let (mut iter, _checkpoint_info) = log_segment.create_checkpoint_stream( + let checkpoint_result = log_segment.create_checkpoint_stream( &engine, v2_checkpoint_read_schema.clone(), None, None, )?; + let mut iter = checkpoint_result.actions; // Assert that the first batch returned is from reading checkpoint file 1 let ActionsBatch { From 6b96787c66221ca437338c613070eecea3014b99 Mon Sep 17 00:00:00 2001 From: DrakeLin Date: Fri, 23 Jan 2026 07:21:39 +0000 Subject: [PATCH 3/3] ps --- kernel/src/log_replay.rs | 9 +- kernel/src/log_segment.rs | 1 - kernel/src/parallel/sequential_phase.rs | 19 ++- kernel/src/scan/data_skipping.rs | 114 +++++++++----- kernel/src/scan/data_skipping/stats_schema.rs | 1 - kernel/src/scan/log_replay.rs | 147 +++++++++++++----- kernel/src/scan/mod.rs | 53 +++++-- kernel/src/scan/test_utils.rs | 8 +- kernel/src/table_changes/log_replay.rs | 19 ++- .../_delta_log/00000000000000000000.json | 4 + .../_delta_log/00000000000000000001.json | 2 + .../_delta_log/00000000000000000002.json | 2 + .../00000000000000000003.checkpoint.parquet | Bin 0 -> 22085 bytes .../_delta_log/00000000000000000003.json | 2 + .../_delta_log/00000000000000000004.json | 2 + .../_delta_log/00000000000000000005.json | 2 + .../parsed-stats/_delta_log/_last_checkpoint | 1 + ...4708-bb30-0888f35cabdd-c000.snappy.parquet | Bin 0 -> 2826 bytes ...479b-a315-4157335e9a11-c000.snappy.parquet | Bin 0 -> 3103 bytes ...425e-98df-2e7141f9b5fb-c000.snappy.parquet | Bin 0 -> 2827 bytes ...4475-aae1-c8edc59274e6-c000.snappy.parquet | Bin 0 -> 2824 bytes ...4248-8c58-fc9f4018e43d-c000.snappy.parquet | Bin 0 -> 2828 bytes ...4e4c-b6f2-5f40c55ef515-c000.snappy.parquet | Bin 0 -> 2828 bytes kernel/tests/read.rs | 88 +++++++++++ 24 files changed, 378 insertions(+), 96 deletions(-) create mode 100644 kernel/tests/data/parsed-stats/_delta_log/00000000000000000000.json create mode 100644 kernel/tests/data/parsed-stats/_delta_log/00000000000000000001.json create mode 100644 kernel/tests/data/parsed-stats/_delta_log/00000000000000000002.json create mode 100644 kernel/tests/data/parsed-stats/_delta_log/00000000000000000003.checkpoint.parquet create mode 100644 kernel/tests/data/parsed-stats/_delta_log/00000000000000000003.json create mode 100644 kernel/tests/data/parsed-stats/_delta_log/00000000000000000004.json create mode 100644 kernel/tests/data/parsed-stats/_delta_log/00000000000000000005.json create mode 100644 kernel/tests/data/parsed-stats/_delta_log/_last_checkpoint create mode 100644 kernel/tests/data/parsed-stats/part-00000-065eae2b-b4ea-4708-bb30-0888f35cabdd-c000.snappy.parquet create mode 100644 kernel/tests/data/parsed-stats/part-00000-06d85a38-b141-479b-a315-4157335e9a11-c000.snappy.parquet create mode 100644 kernel/tests/data/parsed-stats/part-00000-2d9663e0-37c0-425e-98df-2e7141f9b5fb-c000.snappy.parquet create mode 100644 kernel/tests/data/parsed-stats/part-00000-40525115-50e1-4475-aae1-c8edc59274e6-c000.snappy.parquet create mode 100644 kernel/tests/data/parsed-stats/part-00000-a4c1def5-742e-4248-8c58-fc9f4018e43d-c000.snappy.parquet create mode 100644 kernel/tests/data/parsed-stats/part-00000-c0cbdedc-d11b-4e4c-b6f2-5f40c55ef515-c000.snappy.parquet diff --git a/kernel/src/log_replay.rs b/kernel/src/log_replay.rs index 8881a2c3cf..f334b0e12b 100644 --- a/kernel/src/log_replay.rs +++ b/kernel/src/log_replay.rs @@ -310,13 +310,18 @@ pub(crate) trait LogReplayProcessor: Sized { /// /// # Parameters /// - `batch`: A reference to the batch of actions to be processed. + /// - `is_log_batch`: Whether this batch is from a commit log (`true`) or checkpoint (`false`). /// /// # Returns /// A `DeltaResult>`, where each boolean indicates if the corresponding row should be included. /// If no filter is provided, all rows are selected. - fn build_selection_vector(&self, batch: &dyn EngineData) -> DeltaResult> { + fn build_selection_vector( + &self, + batch: &dyn EngineData, + is_log_batch: bool, + ) -> DeltaResult> { match self.data_skipping_filter() { - Some(filter) => filter.apply(batch), + Some(filter) => filter.apply(batch, is_log_batch), None => Ok(vec![true; batch.len()]), // If no filter is provided, select all rows } } diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 2d2540c6b4..429227915e 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -58,7 +58,6 @@ pub(crate) struct ActionsWithCheckpointInfo Iterator for SequentialPhase

{ #[cfg(test)] mod tests { use super::*; + use crate::actions::get_log_add_schema; + use crate::log_segment::CheckpointReadInfo; use crate::scan::log_replay::ScanLogReplayProcessor; use crate::scan::state_info::StateInfo; use crate::utils::test_utils::{assert_result_error_with_message, load_test_table}; - use std::sync::Arc; /// Core helper function to verify sequential processing with expected adds and sidecars. fn verify_sequential_processing( @@ -222,7 +223,13 @@ mod tests { (), )?); - let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?; + // Use base log add schema for tests - no stats_parsed optimization + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; + + let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info, checkpoint_info)?; let mut sequential = SequentialPhase::try_new(processor, log_segment, engine.clone())?; // Process all batches and collect Add file paths @@ -313,7 +320,13 @@ mod tests { (), )?); - let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?; + // Use base log add schema for tests - no stats_parsed optimization + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; + + let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info, checkpoint_info)?; let mut sequential = SequentialPhase::try_new(processor, log_segment, engine.clone())?; // Call next() once but don't exhaust the iterator diff --git a/kernel/src/scan/data_skipping.rs b/kernel/src/scan/data_skipping.rs index 3d69898761..1fc6ffcf5b 100644 --- a/kernel/src/scan/data_skipping.rs +++ b/kernel/src/scan/data_skipping.rs @@ -3,7 +3,6 @@ use std::sync::{Arc, LazyLock}; use tracing::{debug, error}; -use crate::actions::get_log_add_schema; use crate::actions::visitors::SelectionVectorVisitor; use crate::error::DeltaResult; use crate::expressions::{ @@ -14,14 +13,13 @@ use crate::expressions::{ use crate::kernel_predicates::{ DataSkippingPredicateEvaluator, KernelPredicateEvaluator, KernelPredicateEvaluatorDefaults, }; -use crate::schema::{DataType, SchemaRef, SchemaTransform, StructField, StructType}; +use crate::schema::{DataType, SchemaRef}; use crate::{ Engine, EngineData, ExpressionEvaluator, JsonHandler, PredicateEvaluator, RowVisitor as _, }; pub(crate) mod stats_schema; -use stats_schema::{NullCountStatsTransform, NullableStatsTransform}; #[cfg(test)] mod tests; @@ -55,46 +53,50 @@ fn as_sql_data_skipping_predicate(pred: &Pred) -> Option { pub(crate) struct DataSkippingFilter { stats_schema: SchemaRef, select_stats_evaluator: Arc, + /// Evaluator for extracting stats_parsed from checkpoints. + /// Only present when the checkpoint has compatible pre-parsed stats. + select_stats_parsed_evaluator: Option>, skipping_evaluator: Arc, filter_evaluator: Arc, json_handler: Arc, } impl DataSkippingFilter { - /// Creates a new data skipping filter. Returns None if there is no predicate, or the predicate - /// is ineligible for data skipping. + /// Creates a new data skipping filter. Returns None if there is no predicate/stats_schema, + /// or the predicate is ineligible for data skipping. /// /// NOTE: None is equivalent to a trivial filter that always returns TRUE (= keeps all files), /// but using an Option lets the engine easily avoid the overhead of applying trivial filters. + /// + /// `checkpoint_read_schema` is the schema used to read checkpoint files, which includes + /// `stats_parsed` for data skipping optimization. This schema is needed to create the + /// evaluator that extracts stats_parsed from the actions. + /// + /// `has_compatible_stats_parsed` indicates whether the checkpoint has compatible pre-parsed + /// stats. When true, checkpoint batches use stats_parsed directly instead of parsing JSON. pub(crate) fn new( engine: &dyn Engine, - physical_predicate: Option<(PredicateRef, SchemaRef)>, + predicate: Option, + stats_schema: Option, + checkpoint_read_schema: SchemaRef, + has_compatible_stats_parsed: bool, ) -> Option { static STATS_EXPR: LazyLock = LazyLock::new(|| Arc::new(column_expr!("add.stats"))); + static STATS_PARSED_EXPR: LazyLock = + LazyLock::new(|| Arc::new(column_expr!("add.stats_parsed"))); static FILTER_PRED: LazyLock = LazyLock::new(|| Arc::new(column_expr!("output").distinct(Expr::literal(false)))); - let (predicate, referenced_schema) = physical_predicate?; + let predicate = predicate?; + let stats_schema = stats_schema?; debug!("Creating a data skipping filter for {:#?}", predicate); - let stats_schema = NullableStatsTransform - .transform_struct(&referenced_schema)? - .into_owned(); - - let nullcount_schema = NullCountStatsTransform - .transform_struct(&stats_schema)? - .into_owned(); - let stats_schema = Arc::new(StructType::new_unchecked([ - StructField::nullable("numRecords", DataType::LONG), - StructField::nullable("nullCount", nullcount_schema), - StructField::nullable("minValues", stats_schema.clone()), - StructField::nullable("maxValues", stats_schema), - ])); - // Skipping happens in several steps: // - // 1. The stats selector fetches add.stats from the metadata + // 1. The stats selector fetches add.stats or add.stats_parsed from the metadata. + // For checkpoint batches with compatible stats_parsed, we use stats_parsed directly. + // Otherwise, we parse add.stats (JSON string) to a stats struct. // // 2. The predicate (skipping evaluator) produces false for any file whose stats prove we // can safely skip it. A value of true means the stats say we must keep the file, and @@ -106,7 +108,7 @@ impl DataSkippingFilter { let select_stats_evaluator = engine .evaluation_handler() .new_expression_evaluator( - get_log_add_schema().clone(), + checkpoint_read_schema.clone(), STATS_EXPR.clone(), DataType::STRING, ) @@ -115,6 +117,23 @@ impl DataSkippingFilter { .inspect_err(|e| error!("Failed to create select stats evaluator: {e}")) .ok()?; + // Only create stats_parsed evaluator when checkpoint has compatible pre-parsed stats + let select_stats_parsed_evaluator = if has_compatible_stats_parsed { + engine + .evaluation_handler() + .new_expression_evaluator( + checkpoint_read_schema.clone(), + STATS_PARSED_EXPR.clone(), + DataType::Struct(Box::new(stats_schema.as_ref().clone())), + ) + .inspect_err(|e| { + debug!("stats_parsed evaluator not available (falling back to JSON): {e}") + }) + .ok() + } else { + None + }; + let skipping_evaluator = engine .evaluation_handler() .new_predicate_evaluator( @@ -137,6 +156,7 @@ impl DataSkippingFilter { Some(Self { stats_schema, select_stats_evaluator, + select_stats_parsed_evaluator, skipping_evaluator, filter_evaluator, json_handler: engine.json_handler(), @@ -145,24 +165,48 @@ impl DataSkippingFilter { /// Apply the DataSkippingFilter to an EngineData batch of actions. Returns a selection vector /// which can be applied to the actions to find those that passed data skipping. - pub(crate) fn apply(&self, actions: &dyn EngineData) -> DeltaResult> { - // retrieve and parse stats from actions data - let stats = self.select_stats_evaluator.evaluate(actions)?; - assert_eq!(stats.len(), actions.len()); - let parsed_stats = self - .json_handler - .parse_json(stats, self.stats_schema.clone())?; - assert_eq!(parsed_stats.len(), actions.len()); - - // evaluate the predicate on the parsed stats, then convert to selection vector - let skipping_predicate = self.skipping_evaluator.evaluate(&*parsed_stats)?; + /// + /// `is_log_batch` indicates whether this batch is from a commit log (`true`) or checkpoint (`false`). + /// Checkpoint batches may have pre-parsed stats (`stats_parsed`) that can be used directly + /// instead of parsing JSON. Commit batches only have JSON stats. + pub(crate) fn apply( + &self, + actions: &dyn EngineData, + is_log_batch: bool, + ) -> DeltaResult> { + // Get the final stats by either: + // 1. Using stats_parsed directly (for checkpoint batches with compatible stats) + // 2. Parsing JSON (for commit batches or when stats_parsed unavailable) + let final_stats = if let Some(stats_parsed_evaluator) = (!is_log_batch) + .then_some(()) + .and(self.select_stats_parsed_evaluator.as_ref()) + { + // Checkpoint batch with compatible stats_parsed - use it directly + let stats_parsed = stats_parsed_evaluator.evaluate(actions)?; + debug!( + "Using stats_parsed from checkpoint ({} rows)", + stats_parsed.len() + ); + stats_parsed + } else { + // Commit batch or no stats_parsed evaluator - parse JSON + let stats_json = self.select_stats_evaluator.evaluate(actions)?; + assert_eq!(stats_json.len(), actions.len()); + self.json_handler + .parse_json(stats_json, self.stats_schema.clone())? + }; + + assert_eq!(final_stats.len(), actions.len()); + + // Evaluate predicate on the stats + let skipping_predicate = self.skipping_evaluator.evaluate(&*final_stats)?; assert_eq!(skipping_predicate.len(), actions.len()); let selection_vector = self .filter_evaluator .evaluate(skipping_predicate.as_ref())?; assert_eq!(selection_vector.len(), actions.len()); - // visit the engine's selection vector to produce a Vec + // Visit the engine's selection vector to produce a Vec let mut visitor = SelectionVectorVisitor::default(); visitor.visit_rows_of(selection_vector.as_ref())?; Ok(visitor.selection_vector) diff --git a/kernel/src/scan/data_skipping/stats_schema.rs b/kernel/src/scan/data_skipping/stats_schema.rs index c51eacef88..df0f1dfb5e 100644 --- a/kernel/src/scan/data_skipping/stats_schema.rs +++ b/kernel/src/scan/data_skipping/stats_schema.rs @@ -181,7 +181,6 @@ impl<'a> SchemaTransform<'a> for NullableStatsTransform { /// All leaf fields (primitives, arrays, maps, variants) are converted to LONG type /// since null counts are always integers, while struct fields are recursed into /// to preserve the nested structure. -#[allow(unused)] pub(crate) struct NullCountStatsTransform; impl<'a> SchemaTransform<'a> for NullCountStatsTransform { fn transform_struct_field(&mut self, field: &'a StructField) -> Option> { diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 6b919d0730..a830babcab 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -8,12 +8,12 @@ use super::data_skipping::DataSkippingFilter; use super::state_info::StateInfo; use super::{PhysicalPredicate, ScanMetadata}; use crate::actions::deletion_vector::DeletionVectorDescriptor; -use crate::actions::get_log_add_schema; use crate::engine_data::{GetData, RowVisitor, TypedGetData as _}; use crate::expressions::{column_name, ColumnName, Expression, ExpressionRef, PredicateRef}; use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, KernelPredicateEvaluator as _}; use crate::log_replay::deduplicator::Deduplicator; use crate::log_replay::{ActionsBatch, FileActionDeduplicator, FileActionKey, LogReplayProcessor}; +use crate::log_segment::CheckpointReadInfo; use crate::scan::Scalar; use crate::schema::ToSchema as _; use crate::schema::{ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType}; @@ -46,6 +46,8 @@ pub struct SerializableScanState { pub internal_state_blob: Vec, /// Set of file action keys that have already been processed. pub seen_file_keys: HashSet, + /// Information about checkpoint reading for data skipping optimization + pub(crate) checkpoint_info: CheckpointReadInfo, } /// [`ScanLogReplayProcessor`] performs log replay (processes actions) specifically for doing a table scan. @@ -92,8 +94,12 @@ impl ScanLogReplayProcessor { const REMOVE_DV_START_INDEX: usize = 7; // Start position of remove deletion vector columns /// Create a new [`ScanLogReplayProcessor`] instance - pub(crate) fn new(engine: &dyn Engine, state_info: Arc) -> DeltaResult { - Self::new_with_seen_files(engine, state_info, Default::default()) + pub(crate) fn new( + engine: &dyn Engine, + state_info: Arc, + checkpoint_info: CheckpointReadInfo, + ) -> DeltaResult { + Self::new_with_seen_files(engine, state_info, checkpoint_info, Default::default()) } /// Create new [`ScanLogReplayProcessor`] with pre-populated seen_file_keys. @@ -104,35 +110,38 @@ impl ScanLogReplayProcessor { /// # Parameters /// - `engine`: Engine for creating evaluators and filters /// - `state_info`: StateInfo containing schemas, transforms, and predicates + /// - `checkpoint_info`: Information about checkpoint reading for data skipping /// - `seen_file_keys`: Pre-computed set of file action keys that have been seen pub(crate) fn new_with_seen_files( engine: &dyn Engine, state_info: Arc, + checkpoint_info: CheckpointReadInfo, seen_file_keys: HashSet, ) -> DeltaResult { - // Extract the physical predicate from StateInfo's PhysicalPredicate enum. - // The DataSkippingFilter and partition_filter components expect the predicate - // in the format Option<(PredicateRef, SchemaRef)>, so we need to convert from - // the enum representation to the tuple format. - let physical_predicate = match &state_info.physical_predicate { - PhysicalPredicate::Some(predicate, schema) => { - // Valid predicate that can be used for data skipping and partition filtering - Some((predicate.clone(), schema.clone())) - } + // Extract the predicate from StateInfo's PhysicalPredicate enum. + let predicate = match &state_info.physical_predicate { + PhysicalPredicate::Some(predicate, _) => Some(predicate.clone()), PhysicalPredicate::StaticSkipAll => { debug_assert!(false, "StaticSkipAll case should be handled at a higher level and not reach this code"); None } - PhysicalPredicate::None => { - // No predicate provided - None - } + PhysicalPredicate::None => None, }; + let CheckpointReadInfo { + has_stats_parsed, + checkpoint_read_schema, + } = checkpoint_info; Ok(Self { - partition_filter: physical_predicate.as_ref().map(|(e, _)| e.clone()), - data_skipping_filter: DataSkippingFilter::new(engine, physical_predicate), + partition_filter: predicate.clone(), + data_skipping_filter: DataSkippingFilter::new( + engine, + predicate, + state_info.stats_schema.clone(), + checkpoint_read_schema.clone(), + has_stats_parsed, + ), add_transform: engine.evaluation_handler().new_expression_evaluator( - get_log_add_schema().clone(), + checkpoint_read_schema, get_add_transform_expr(), SCAN_ROW_DATATYPE.clone(), )?, @@ -156,7 +165,10 @@ impl ScanLogReplayProcessor { /// undefined behaviour! #[internal_api] #[allow(unused)] - pub(crate) fn into_serializable_state(self) -> DeltaResult { + pub(crate) fn into_serializable_state( + self, + checkpoint_info: CheckpointReadInfo, + ) -> DeltaResult { let StateInfo { logical_schema, physical_schema, @@ -184,13 +196,12 @@ impl ScanLogReplayProcessor { let internal_state_blob = serde_json::to_vec(&internal_state) .map_err(|e| Error::generic(format!("Failed to serialize internal state: {}", e)))?; - let state = SerializableScanState { + Ok(SerializableScanState { predicate, internal_state_blob, seen_file_keys: self.seen_file_keys, - }; - - Ok(state) + checkpoint_info, + }) } /// Reconstruct a processor from serialized state. @@ -238,7 +249,12 @@ impl ScanLogReplayProcessor { stats_schema: internal_state.stats_schema, }); - let processor = Self::new_with_seen_files(engine, state_info, state.seen_file_keys)?; + let processor = Self::new_with_seen_files( + engine, + state_info, + state.checkpoint_info, + state.seen_file_keys, + )?; Ok(Arc::new(processor)) } @@ -508,7 +524,7 @@ impl LogReplayProcessor for ScanLogReplayProcessor { // Build an initial selection vector for the batch which has had the data skipping filter // applied. The selection vector is further updated by the deduplication visitor to remove // rows that are not valid adds. - let selection_vector = self.build_selection_vector(actions.as_ref())?; + let selection_vector = self.build_selection_vector(actions.as_ref(), is_log_batch)?; assert_eq!(selection_vector.len(), actions.len()); let deduplicator = FileActionDeduplicator::new( @@ -553,8 +569,12 @@ pub(crate) fn scan_action_iter( engine: &dyn Engine, action_iter: impl Iterator>, state_info: Arc, + checkpoint_info: CheckpointReadInfo, ) -> DeltaResult>> { - Ok(ScanLogReplayProcessor::new(engine, state_info)?.process_actions_iter(action_iter)) + Ok( + ScanLogReplayProcessor::new(engine, state_info, checkpoint_info)? + .process_actions_iter(action_iter), + ) } #[cfg(test)] @@ -562,10 +582,11 @@ mod tests { use std::collections::{HashMap, HashSet}; use std::sync::Arc; - use crate::actions::get_commit_schema; + use crate::actions::{get_commit_schema, get_log_add_schema}; use crate::engine::sync::SyncEngine; use crate::expressions::{BinaryExpressionOp, Scalar}; use crate::log_replay::ActionsBatch; + use crate::log_segment::CheckpointReadInfo; use crate::scan::state::ScanFile; use crate::scan::state_info::tests::{ assert_transform_spec, get_simple_state_info, get_state_info, @@ -640,12 +661,17 @@ mod tests { column_mapping_mode: ColumnMappingMode::None, stats_schema: None, }); + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let iter = scan_action_iter( &SyncEngine::new(), batch .into_iter() .map(|batch| Ok(ActionsBatch::new(batch as _, true))), state_info, + checkpoint_info, ) .unwrap(); for res in iter { @@ -666,12 +692,17 @@ mod tests { let partition_cols = vec!["date".to_string()]; let state_info = get_simple_state_info(schema, partition_cols).unwrap(); let batch = vec![add_batch_with_partition_col()]; + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let iter = scan_action_iter( &SyncEngine::new(), batch .into_iter() .map(|batch| Ok(ActionsBatch::new(batch as _, true))), Arc::new(state_info), + checkpoint_info, ) .unwrap(); @@ -744,12 +775,17 @@ mod tests { ); let batch = vec![add_batch_for_row_id(get_commit_schema().clone())]; + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let iter = scan_action_iter( &SyncEngine::new(), batch .into_iter() .map(|batch| Ok(ActionsBatch::new(batch as _, true))), Arc::new(state_info), + checkpoint_info, ) .unwrap(); @@ -789,9 +825,14 @@ mod tests { StructField::new("id", DataType::INTEGER, true), StructField::new("value", DataType::STRING, true), ])); + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let mut processor = ScanLogReplayProcessor::new( &engine, Arc::new(get_simple_state_info(schema.clone(), vec![]).unwrap()), + checkpoint_info.clone(), ) .unwrap(); @@ -806,7 +847,7 @@ mod tests { let state_info = processor.state_info.clone(); let deserialized = ScanLogReplayProcessor::from_serializable_state( &engine, - processor.into_serializable_state().unwrap(), + processor.into_serializable_state(checkpoint_info).unwrap(), ) .unwrap(); @@ -839,6 +880,10 @@ mod tests { StructField::new("id", DataType::INTEGER, true), StructField::new("value", DataType::STRING, true), ])); + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let predicate = Arc::new(crate::expressions::Predicate::eq( Expr::column(["id"]), Expr::literal(10i32), @@ -857,10 +902,12 @@ mod tests { PhysicalPredicate::Some(_, s) => s.clone(), _ => panic!("Expected predicate"), }; - let processor = ScanLogReplayProcessor::new(&engine, state_info.clone()).unwrap(); + let processor = + ScanLogReplayProcessor::new(&engine, state_info.clone(), checkpoint_info.clone()) + .unwrap(); let deserialized = ScanLogReplayProcessor::from_serializable_state( &engine, - processor.into_serializable_state().unwrap(), + processor.into_serializable_state(checkpoint_info).unwrap(), ) .unwrap(); @@ -881,6 +928,10 @@ mod tests { StructField::new("value", DataType::INTEGER, true), StructField::new("date", DataType::DATE, true), ])); + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let state_info = Arc::new( get_state_info( schema, @@ -902,10 +953,12 @@ mod tests { ); let original_transform = state_info.transform_spec.clone(); assert!(original_transform.is_some()); - let processor = ScanLogReplayProcessor::new(&engine, state_info.clone()).unwrap(); + let processor = + ScanLogReplayProcessor::new(&engine, state_info.clone(), checkpoint_info.clone()) + .unwrap(); let deserialized = ScanLogReplayProcessor::from_serializable_state( &engine, - processor.into_serializable_state().unwrap(), + processor.into_serializable_state(checkpoint_info).unwrap(), ) .unwrap(); assert_eq!(deserialized.state_info.transform_spec, original_transform); @@ -920,6 +973,10 @@ mod tests { ColumnMappingMode::Id, ColumnMappingMode::Name, ] { + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new( "id", DataType::INTEGER, @@ -933,10 +990,11 @@ mod tests { column_mapping_mode: mode, stats_schema: None, }); - let processor = ScanLogReplayProcessor::new(&engine, state_info).unwrap(); + let processor = + ScanLogReplayProcessor::new(&engine, state_info, checkpoint_info.clone()).unwrap(); let deserialized = ScanLogReplayProcessor::from_serializable_state( &engine, - processor.into_serializable_state().unwrap(), + processor.into_serializable_state(checkpoint_info).unwrap(), ) .unwrap(); assert_eq!(deserialized.state_info.column_mapping_mode, mode); @@ -947,6 +1005,10 @@ mod tests { fn test_serialization_edge_cases() { // Test edge cases: empty seen_file_keys, no predicate, no transform_spec let engine = SyncEngine::new(); + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new( "id", DataType::INTEGER, @@ -960,8 +1022,9 @@ mod tests { column_mapping_mode: ColumnMappingMode::None, stats_schema: None, }); - let processor = ScanLogReplayProcessor::new(&engine, state_info).unwrap(); - let serialized = processor.into_serializable_state().unwrap(); + let processor = + ScanLogReplayProcessor::new(&engine, state_info, checkpoint_info.clone()).unwrap(); + let serialized = processor.into_serializable_state(checkpoint_info).unwrap(); assert!(serialized.predicate.is_none()); let deserialized = ScanLogReplayProcessor::from_serializable_state(&engine, serialized).unwrap(); @@ -973,10 +1036,15 @@ mod tests { fn test_serialization_invalid_json() { // Test that invalid JSON blobs are properly rejected let engine = SyncEngine::new(); + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let invalid_state = SerializableScanState { predicate: None, internal_state_blob: vec![0, 1, 2, 3, 255], // Invalid JSON seen_file_keys: HashSet::new(), + checkpoint_info, }; assert!(ScanLogReplayProcessor::from_serializable_state(&engine, invalid_state).is_err()); } @@ -985,6 +1053,10 @@ mod tests { fn test_serialization_missing_predicate_schema() { // Test that missing predicate_schema when predicate exists is detected let engine = SyncEngine::new(); + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new( "id", DataType::INTEGER, @@ -1004,6 +1076,7 @@ mod tests { predicate: Some(predicate), // Predicate exists but schema is None internal_state_blob: invalid_blob, seen_file_keys: HashSet::new(), + checkpoint_info, }; let result = ScanLogReplayProcessor::from_serializable_state(&engine, invalid_state); assert!(result.is_err()); diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index c3cea58d75..22b1c33322 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -20,7 +20,7 @@ use crate::expressions::{ColumnName, ExpressionRef, Predicate, PredicateRef, Sca use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, EmptyColumnResolver}; use crate::listed_log_files::ListedLogFilesBuilder; use crate::log_replay::{ActionsBatch, HasSelectionVector}; -use crate::log_segment::{ActionsWithCheckpointInfo, LogSegment}; +use crate::log_segment::{ActionsWithCheckpointInfo, CheckpointReadInfo, LogSegment}; use crate::scan::log_replay::{BASE_ROW_ID_NAME, CLUSTERING_PROVIDER_NAME}; use crate::scan::state_info::StateInfo; use crate::schema::{ @@ -435,8 +435,8 @@ impl Scan { &self, engine: &dyn Engine, ) -> DeltaResult>> { - let result = self.replay_for_scan_metadata(engine)?; - self.scan_metadata_inner(engine, result.actions) + let actions_with_checkpoint_info = self.replay_for_scan_metadata(engine)?; + self.scan_metadata_inner(engine, actions_with_checkpoint_info) } /// Get an updated iterator of [`ScanMetadata`]s based on an existing iterator of [`EngineData`]s. @@ -511,15 +511,25 @@ impl Scan { Ok(ActionsBatch::new(transform.evaluate(data.as_ref())?, false)) }; + let log_segment = self.snapshot.log_segment(); + // If the snapshot version corresponds to the hint version, we process the existing data // to apply file skipping and provide the required transformations. + // Since we're only processing existing data (no checkpoint), we use the base schema + // and no stats_parsed optimization. if existing_version == self.snapshot.version() { - let scan = existing_data.into_iter().map(apply_transform); - return Ok(Box::new(self.scan_metadata_inner(engine, scan)?)); + let actions_with_checkpoint_info = ActionsWithCheckpointInfo { + actions: existing_data.into_iter().map(apply_transform), + checkpoint_info: CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: CHECKPOINT_READ_SCHEMA.clone(), + }, + }; + return Ok(Box::new( + self.scan_metadata_inner(engine, actions_with_checkpoint_info)?, + )); } - let log_segment = self.snapshot.log_segment(); - // If the current log segment contains a checkpoint newer than the hint version // we disregard the existing data hint, and perform a full scan. The current log segment // only has deltas after the checkpoint, so we cannot update from prior versions. @@ -545,29 +555,44 @@ impl Scan { None, // No checkpoint in this incremental segment )?; + // For incremental reads, new_log_segment has no checkpoint but we use the + // checkpoint schema returned by the function for consistency. let result = new_log_segment.read_actions_with_projected_checkpoint_actions( engine, COMMIT_READ_SCHEMA.clone(), CHECKPOINT_READ_SCHEMA.clone(), None, - None, + self.state_info.stats_schema.as_ref().map(|s| s.as_ref()), )?; - let it = result - .actions - .chain(existing_data.into_iter().map(apply_transform)); + let actions_with_checkpoint_info = ActionsWithCheckpointInfo { + actions: result + .actions + .chain(existing_data.into_iter().map(apply_transform)), + checkpoint_info: result.checkpoint_info, + }; - Ok(Box::new(self.scan_metadata_inner(engine, it)?)) + Ok(Box::new(self.scan_metadata_inner( + engine, + actions_with_checkpoint_info, + )?)) } fn scan_metadata_inner( &self, engine: &dyn Engine, - action_batch_iter: impl Iterator>, + actions_with_checkpoint_info: ActionsWithCheckpointInfo< + impl Iterator>, + >, ) -> DeltaResult>> { if let PhysicalPredicate::StaticSkipAll = self.state_info.physical_predicate { return Ok(None.into_iter().flatten()); } - let it = scan_action_iter(engine, action_batch_iter, self.state_info.clone())?; + let it = scan_action_iter( + engine, + actions_with_checkpoint_info.actions, + self.state_info.clone(), + actions_with_checkpoint_info.checkpoint_info, + )?; Ok(Some(it).into_iter().flatten()) } diff --git a/kernel/src/scan/test_utils.rs b/kernel/src/scan/test_utils.rs index cd340fb71a..9ced9cc717 100644 --- a/kernel/src/scan/test_utils.rs +++ b/kernel/src/scan/test_utils.rs @@ -6,8 +6,9 @@ use itertools::Itertools; use std::sync::Arc; use crate::log_replay::ActionsBatch; +use crate::log_segment::CheckpointReadInfo; use crate::{ - actions::get_commit_schema, + actions::{get_commit_schema, get_log_add_schema}, engine::{ arrow_data::ArrowEngineData, sync::{json::SyncJsonHandler, SyncEngine}, @@ -148,12 +149,17 @@ pub(crate) fn run_with_validate_callback( column_mapping_mode: ColumnMappingMode::None, stats_schema: None, }); + let checkpoint_info = CheckpointReadInfo { + has_stats_parsed: false, + checkpoint_read_schema: get_log_add_schema().clone(), + }; let iter = scan_action_iter( &SyncEngine::new(), batch .into_iter() .map(|batch| Ok(ActionsBatch::new(batch as _, true))), state_info, + checkpoint_info, ) .unwrap(); let mut batch_count = 0; diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index 8332a854eb..a20f6676b7 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -13,6 +13,7 @@ use crate::actions::{ use crate::engine_data::{GetData, TypedGetData}; use crate::expressions::{column_name, ColumnName}; use crate::path::ParsedLogPath; +use crate::scan::data_skipping::stats_schema::build_stats_schema; use crate::scan::data_skipping::DataSkippingFilter; use crate::scan::state::DvInfo; use crate::schema::{ @@ -57,7 +58,20 @@ pub(crate) fn table_changes_action_iter( table_schema: SchemaRef, physical_predicate: Option<(PredicateRef, SchemaRef)>, ) -> DeltaResult>> { - let filter = DataSkippingFilter::new(engine.as_ref(), physical_predicate).map(Arc::new); + // For table_changes, we read commit files directly (not checkpoints), so there's no + // stats_parsed available. Build stats_schema from the predicate's referenced schema. + let (predicate, stats_schema) = match physical_predicate { + Some((pred, schema)) => (Some(pred), build_stats_schema(&schema)), + None => (None, None), + }; + let filter = DataSkippingFilter::new( + engine.as_ref(), + predicate, + stats_schema, + get_log_add_schema().clone(), + false, // has_compatible_stats_parsed + ) + .map(Arc::new); let mut current_configuration = start_table_configuration.clone(); let result = commit_files @@ -281,8 +295,9 @@ impl LogReplayScanner { // Apply data skipping to get back a selection vector for actions that passed skipping. // We start our selection vector based on what was filtered. We will add to this vector // below if a file has been removed. Note: None implies all files passed data skipping. + // Table changes always reads from commit files (is_log_batch = true), not checkpoints. let selection_vector = match &filter { - Some(filter) => filter.apply(actions.as_ref())?, + Some(filter) => filter.apply(actions.as_ref(), true /* is_log_batch */)?, None => vec![true; actions.len()], }; diff --git a/kernel/tests/data/parsed-stats/_delta_log/00000000000000000000.json b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000000.json new file mode 100644 index 0000000000..57a5d1011f --- /dev/null +++ b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"07b3a271-f0d6-4c3b-a300-3170847ceafe","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"salary\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpointInterval":"3"},"createdTime":1765928274753}} +{"add":{"path":"part-00000-06d85a38-b141-479b-a315-4157335e9a11-c000.snappy.parquet","partitionValues":{},"size":3103,"modificationTime":1765928276060,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"id\":1,\"name\":\"name_1\",\"age\":20,\"salary\":50100},\"maxValues\":{\"id\":100,\"name\":\"name_99\",\"age\":69,\"salary\":60000},\"nullCount\":{\"id\":0,\"name\":0,\"age\":0,\"salary\":0}}"}} +{"commitInfo":{"timestamp":1765928276979,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputRows":"100","numOutputBytes":"3103"},"engineInfo":"Apache-Spark/3.2.4 Delta-Lake/2.1.0","txnId":"e3e4034b-7a93-4d2b-9de4-3fda01735de4"}} diff --git a/kernel/tests/data/parsed-stats/_delta_log/00000000000000000001.json b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000001.json new file mode 100644 index 0000000000..5f2597c27f --- /dev/null +++ b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000001.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00000-065eae2b-b4ea-4708-bb30-0888f35cabdd-c000.snappy.parquet","partitionValues":{},"size":2826,"modificationTime":1765928279084,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"id\":101,\"name\":\"name_101\",\"age\":20,\"salary\":60100},\"maxValues\":{\"id\":200,\"name\":\"name_200\",\"age\":69,\"salary\":70000},\"nullCount\":{\"id\":0,\"name\":0,\"age\":0,\"salary\":0}}"}} +{"commitInfo":{"timestamp":1765928279091,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"100","numOutputBytes":"2826"},"engineInfo":"Apache-Spark/3.2.4 Delta-Lake/2.1.0","txnId":"6b5f28e4-a9cb-4c53-8b29-99e13d48a280"}} diff --git a/kernel/tests/data/parsed-stats/_delta_log/00000000000000000002.json b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000002.json new file mode 100644 index 0000000000..2829387d25 --- /dev/null +++ b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000002.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00000-a4c1def5-742e-4248-8c58-fc9f4018e43d-c000.snappy.parquet","partitionValues":{},"size":2828,"modificationTime":1765928279752,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"id\":201,\"name\":\"name_201\",\"age\":20,\"salary\":70100},\"maxValues\":{\"id\":300,\"name\":\"name_300\",\"age\":69,\"salary\":80000},\"nullCount\":{\"id\":0,\"name\":0,\"age\":0,\"salary\":0}}"}} +{"commitInfo":{"timestamp":1765928279759,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"100","numOutputBytes":"2828"},"engineInfo":"Apache-Spark/3.2.4 Delta-Lake/2.1.0","txnId":"23695a3f-264a-427d-a3f1-8e0eda4016f5"}} diff --git a/kernel/tests/data/parsed-stats/_delta_log/00000000000000000003.checkpoint.parquet b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000003.checkpoint.parquet new file mode 100644 index 0000000000000000000000000000000000000000..96e4924ab30f3eacc638ea8feffe96996427a115 GIT binary patch literal 22085 zcmeG^3uqf>nj=|uRNG0NZlsgDZkzR{Kfa36Kw(L9{TeT%yeWW94{M5s> zcl1JUw_BEOuGi~@V+o~{Qp&QFrS#I$=8oei<=88wlv0ke93_Fe!vDWcugmk{l4r$@2-yvZrMQ|WWK9aFxz zXDV(_h@yBXmjPX_90Jg%mX!SAkZ}Eo>`pk6%9O|6=XNRp?DpEd36Ix4mGDiuMTb{$ zyOPy_iG}uy7sOtVA}h|gJ?>UyySq>H+T(GTXcxU+@081vkmJc0L&Alm*CV^&6EFmw zd~v(%a(L`+ho{fw@+dyp;jkaOh_7j*4LZZ@67C(#*W_GNrZZWro*ydX8?@MJ#HSE zs_+84e0M?e!3L20ZnV*C9-=2Ar2jWC5Zbi zon>s??d5Ek&<6MHHwf&lz4h|ZNZb%7N$VW@x{Y0HtI5exi&u-a{pOYyq0u0;?lvE3 zZUMq@J%BoatK3;_X6=j3to5~;x~jASaH%VOuuqpF(3Ntts|s=Z-v;Z~2KMH6*ys0j zY=A&jLm#y~T#)NP_xFDC=f5-v!iV?YWj@l~!CxM0pk7Zw%~fE4yd~F)Pv5AXY_8!O~{ghn^$Q&|)(P@CU%@7q~Nb z6x{gDdpj&b`-3#jfYO^GBg=sqIWx_93R@%Wv-Trp7}@~_{;NLVtP?wpjRL#WQR=g% zygXeiU_SmXd+WP3-E?CCbJz1Qvc7o#n#9YZ9CT>E+%uGhaIBW-I<1a+Ymh@39W8X?A$Gxdwty&$?OxwMrnU$DU}LjmooXi z&j{k)^Mbg$R}kAhX7Sv)mp>8&=ffWfCZ`zwNHF(_j>Zc@8X5!_-6HVh{nJOnD*QUI z#eproPrwbq1Sh`seG0dCUwYIiI2`}k|A25he9<6yo_JsAfBjI@X!7}nBSxY3)2kIJ zq1V*)zG1s`LaA0$Drlutsi-DZsi3NYQbA#))Pl-11O;$-AXDfp0eZB(TbCSVmaS@6 zkIt?oGLXCI9mD6VCPME`icpc7-u#O~G<4xG)|ku7`5dL)x4vV&(87NAeRkym9Kr7x zu{=0;%!pM|z6n$Q35$`cV{=Cm42>6L>3tSkv52slwJ*gd99Jq^0l4`94HI|YZ7>V$ z$F9N{=0!(oB}X|8RZmVyHWxD=R+aZLOHh;dP`4SF!fb%!gQZn)-q#9=!_lO* zI?8441^Kl8wkCRVR7!X0a3Sc|lFo$iWY5nH)@x9K`Pj!^Oa+xBs!CGgk^6uRyny49 z>5a{-eJMVnx++@%xTQ<861wD-hw3s?1Eg}2;vIwas)4)F4|`HoaH2 ze&sa?27~F7y--G8{XK~(y!Frgx7=R+aV%%YYV#J;gYLk^};#ttwkdC_aChLh-y8LUCa)_q!+6kSwZZkWAw5}mVN(p6`mdwx(kO^liN6Nb9TDxtNL`{8Lyimg%sBga?6w!~-zV!5 zt}G7UnPAUN-1f3vc)Tvz^dZMh_lHN>)kiliUfEK^T8sCeoi1ecuBJsLojs!z7e;aY z{*}}0nbS2_6*fTW4Y{Bbjndwvl2WvVB`DI<3seXo**`qV{^Q9SWCd6*YAdc8(RM`B zKEB$>{sh|*>MD^cPx(Souk|py8b5-Y61v5qHIe=*S&np{vYb~kNhLWjn^I0Hc^Oxu zht$svM}2*+!hMR%uIMq}&azKt>!q~5_AjO?gs!#rhjM-}&we$( zb=3blvmW&_J-?S`m(w-rk66aE33u0}12E``eQ}rU>~q+s#AL7Cop9j}4{X`E9Izwg z?n@~0lmcUP$%MSOx$8hZaJIU70Tb$^{k&i>3hbMw3KDmj(-2TW2qA7iiP*^l!zjk6JAzd2KdSW8mL zB^GBFFq2XWLYg<0*yl?%X+rU(IYdhr&JEN(#ws@7rGBz$4@JwpOP2d`xxkaRV7aEL z>|$EZn->fz;;a~t7B7uLwlCqHCw|F9-EubM0uV4I$=r2Vb&`owOS zBRt)iUs=G#`dof-DUpXI*{NA2h3or|!qPTh$A@+OZmnA?o0*0;ER^MV3IG92fn|Jk zOoIjIbLYA(hZ`l!kv2d%o4N9^V1{*y_J@Sjn=8UTK7)VzVZrqHxhn4SH^yyYs2~bS zR1k(3Twk05G%lSNfa}sk0aV@h*bwyN=WKWM!D}V*tH;k3lL;RNZv2&@5H7`Wpf#76 zQPT1w=zi0bO5WXVz0$(|cpgrD{0MW&9krII^^TkA(-*keR*WpLpv&HQq3EB3g`uUb zaH1lRO)aG}Ia6vjm)`}mo3xV2lfz;*zy33D*PqosrR2r^1D?%0QZ@(;m>=ckL*V;~ z`GxFkCO?$PD~oV80`lyB%K=?Y(%faM)z?<@3lL~K!e<+_d%}nF^S`Jj;qF8>Gc`NC zv`7a5s+<^k)@Z$GWY=DVy_BDWgg3`fRVy`H_GnsZY{Nw`7l_|!VI|@lTsDIXzU@8A z1){}w!Au3pw5GcFlm8KG`+Rb?_;S~bO+T2b-k z!@JFdt^5vh=-!I_?BCv=oY0a+48iQIX}9MUL;)H3Dr8lAuf7}Iwj@K4Oq3QX|abkHQcO!mXxbS^c7VC&}lkxghwA$40t{oCDJN~B5djS zyBP-2-}D0qTECEyQ8`BFP?4F7%_X@{u(q~$wBP8t*+bxLGv~V5W{+zB|2Px84d;8e z<)|*Z1;=LDEk3`>HhU;_s}7iM-C0p~=bT;jgRr;tVadv}a)*w=v7I`P#%{$?v7I}} z#dh*U72B!9P3)GOI$}F^CW!6i;T*P8M`zg19c*DccI<@hr(sbDDx})6odFnbQtz>y9q4tvrWt8*TA#a?_q|$u>OO){XOR+5>0XwVloG?ELYq z_B$KQrAH4osgEiqv8un8ouBd@q+?$`AJB4IV(vHRNL`UP-7s08kZ$h z!zif9PyCa9sJ}lSn+VT@Lj&R@eF=^u{@}=1PY>XcMu(_%5FU%B1FZNhG`tdv4v4X6 zcwCM;Qd0k%$n~!h=?|*!{b4XN01+x~4}A0^67mTNY_Jqdr!vWjV>8K+s>|Ri!25E( zDYAzY43mQCM)oGx;7^2KN+8lyJQH4#qmfl9gzyLFeIwkS{xFh5$I)#(*<~u^rM3ZAL8QbT_=|It;E6hph-8v=QQ|mKgqB2 z7m{CRpEHR;l6_S=KIQ;d8~2^LN&5~Xy!WIEUrS$Q_6>TG-xDKnBl*{}uQZMLqa$%O z9(aG9e5fAGS86_qEnp1etJJ^h1pn(V;}4xwjaPhN z00j*sOSpLa;1k)`zch)lhu3lJ-!oZ8kL_@MV>Xdsgy z@{;HSreY&=s(q^aBSG?1rpAK@eL9sm>z_yXjAmao9zrBKLD3U|4#sBUgAvl*9QN;9 zb#s0tRo_Ma2#5ssFUBV3r5yG@IH$=MSi?T6^27qSPavq_=bZcJ)b};>;|S7Dga%e1 zFN$*1w-ljV1^LZC2JpWAgTM;**>HaBCwT9{4dq9F4*O?DGIhpJU=;hWX!acCql_*; zBI`T}hazqmPnRGT=ELb!E;jB+fnUr|!uUj;PI7&z^7{R}wLG2(bqvR|@!{-b)RRGf z^dbF~RgdbA++O)68{6( z|KO;`zdC;F;thi+Fpkb4Vvrp>3k>-a7`rF%eF)RIe;mhm@;B~}P&155WDnn%#?LV2 zZ%X>SE5-i^)&uZP@rJtI-A#PbR8 z#47=53ZA5X!bjr=sW#+s#6!!$xQoUqpf}eG{yM7RQ|l}0&+#D=i9$8u9LT{q=>hyh zYJXn-L8dK(FVAlz^<*@el1_{PJzgFmr_%9N^}T_2pHbI|G9)$cB zF5u(!8ShX0ASN{*56{HGo(eQiY5jSmNlEQpt?xLOMR@cv33765G>}Dlgp@QLm{#9w zjklzaczPg@aR>Gr#r`Yn(L=3-QXp{9TS_k|YV>OOAH`O6|8p$FQ#{Ah*fKKL2mWxj zN6mNj`JX?l_E-JSKV1f2+y9P@K`E{CzoAw2y??EIQS`q-?0-;PhhA|!KQ&iM?^IBu zr}{fNq|RSab^fJf)bU6xI(#OP4r3jf=uyWLKC(;W>U%AFlk4#OLH^l~{YOS%Xhd>r z)`RFEei34o*T2Ca4Bfuz9ta)yu<`LMgzyI^>)}^d?+;=Br4euqQqOwzU?7M6J(_&# zcvevll0QRE4Gje4(7;m632rhx2mE literal 0 HcmV?d00001 diff --git a/kernel/tests/data/parsed-stats/_delta_log/00000000000000000003.json b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000003.json new file mode 100644 index 0000000000..8fe41abeea --- /dev/null +++ b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000003.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00000-2d9663e0-37c0-425e-98df-2e7141f9b5fb-c000.snappy.parquet","partitionValues":{},"size":2827,"modificationTime":1765928280364,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"id\":301,\"name\":\"name_301\",\"age\":20,\"salary\":80100},\"maxValues\":{\"id\":400,\"name\":\"name_400\",\"age\":69,\"salary\":90000},\"nullCount\":{\"id\":0,\"name\":0,\"age\":0,\"salary\":0}}"}} +{"commitInfo":{"timestamp":1765928280369,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":2,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"100","numOutputBytes":"2827"},"engineInfo":"Apache-Spark/3.2.4 Delta-Lake/2.1.0","txnId":"88155a70-c671-4065-9a07-6e1eb3bbad68"}} diff --git a/kernel/tests/data/parsed-stats/_delta_log/00000000000000000004.json b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000004.json new file mode 100644 index 0000000000..7d87c00607 --- /dev/null +++ b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000004.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00000-40525115-50e1-4475-aae1-c8edc59274e6-c000.snappy.parquet","partitionValues":{},"size":2824,"modificationTime":1765928281356,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"id\":401,\"name\":\"name_401\",\"age\":20,\"salary\":90100},\"maxValues\":{\"id\":500,\"name\":\"name_500\",\"age\":69,\"salary\":100000},\"nullCount\":{\"id\":0,\"name\":0,\"age\":0,\"salary\":0}}"}} +{"commitInfo":{"timestamp":1765928281364,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":3,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"100","numOutputBytes":"2824"},"engineInfo":"Apache-Spark/3.2.4 Delta-Lake/2.1.0","txnId":"53ed470b-13c7-437f-829d-9a6704a285a0"}} diff --git a/kernel/tests/data/parsed-stats/_delta_log/00000000000000000005.json b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000005.json new file mode 100644 index 0000000000..97cec9fa13 --- /dev/null +++ b/kernel/tests/data/parsed-stats/_delta_log/00000000000000000005.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00000-c0cbdedc-d11b-4e4c-b6f2-5f40c55ef515-c000.snappy.parquet","partitionValues":{},"size":2828,"modificationTime":1765928282040,"dataChange":true,"stats":"{\"numRecords\":100,\"minValues\":{\"id\":501,\"name\":\"name_501\",\"age\":20,\"salary\":100100},\"maxValues\":{\"id\":600,\"name\":\"name_600\",\"age\":69,\"salary\":110000},\"nullCount\":{\"id\":0,\"name\":0,\"age\":0,\"salary\":0}}"}} +{"commitInfo":{"timestamp":1765928282047,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":4,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"100","numOutputBytes":"2828"},"engineInfo":"Apache-Spark/3.2.4 Delta-Lake/2.1.0","txnId":"e0681515-134d-427b-b265-da87d06ec8c3"}} diff --git a/kernel/tests/data/parsed-stats/_delta_log/_last_checkpoint b/kernel/tests/data/parsed-stats/_delta_log/_last_checkpoint new file mode 100644 index 0000000000..136ec5b697 --- /dev/null +++ b/kernel/tests/data/parsed-stats/_delta_log/_last_checkpoint @@ -0,0 +1 @@ +{"version":3,"size":6,"sizeInBytes":12170,"numOfAddFiles":4,"checkpointSchema":{"type":"struct","fields":[{"name":"txn","type":{"type":"struct","fields":[{"name":"appId","type":"string","nullable":true,"metadata":{}},{"name":"version","type":"long","nullable":true,"metadata":{}},{"name":"lastUpdated","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"add","type":{"type":"struct","fields":[{"name":"path","type":"string","nullable":true,"metadata":{}},{"name":"partitionValues","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"size","type":"long","nullable":true,"metadata":{}},{"name":"modificationTime","type":"long","nullable":true,"metadata":{}},{"name":"dataChange","type":"boolean","nullable":true,"metadata":{}},{"name":"tags","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"stats","type":"string","nullable":true,"metadata":{}},{"name":"stats_parsed","type":{"type":"struct","fields":[{"name":"numRecords","type":"long","nullable":true,"metadata":{}},{"name":"minValues","type":{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"long","nullable":true,"metadata":{}},{"name":"salary","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"maxValues","type":{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"long","nullable":true,"metadata":{}},{"name":"salary","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"nullCount","type":{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"long","nullable":true,"metadata":{}},{"name":"age","type":"long","nullable":true,"metadata":{}},{"name":"salary","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"remove","type":{"type":"struct","fields":[{"name":"path","type":"string","nullable":true,"metadata":{}},{"name":"deletionTimestamp","type":"long","nullable":true,"metadata":{}},{"name":"dataChange","type":"boolean","nullable":true,"metadata":{}},{"name":"extendedFileMetadata","type":"boolean","nullable":true,"metadata":{}},{"name":"partitionValues","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"size","type":"long","nullable":true,"metadata":{}},{"name":"tags","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"metaData","type":{"type":"struct","fields":[{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"description","type":"string","nullable":true,"metadata":{}},{"name":"format","type":{"type":"struct","fields":[{"name":"provider","type":"string","nullable":true,"metadata":{}},{"name":"options","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"schemaString","type":"string","nullable":true,"metadata":{}},{"name":"partitionColumns","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"configuration","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"createdTime","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"protocol","type":{"type":"struct","fields":[{"name":"minReaderVersion","type":"integer","nullable":true,"metadata":{}},{"name":"minWriterVersion","type":"integer","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}} diff --git a/kernel/tests/data/parsed-stats/part-00000-065eae2b-b4ea-4708-bb30-0888f35cabdd-c000.snappy.parquet b/kernel/tests/data/parsed-stats/part-00000-065eae2b-b4ea-4708-bb30-0888f35cabdd-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..b47546b961d1d63f53f3ddcd15e88d3d09d7060d GIT binary patch literal 2826 zcmbW(dr(wm9LDi?m&5Ks7lZVic4H~pkg(QmZ(Jk8!WpKa8K$vvCLCSa<&w)qE?z=P z4NVP=%*@QpC@-0rmABNajEHdJGH+37QIhpN%PuhahZy5$_WeEYyXPF< z_dPEbPQ2fx2<50r=`bjpzFPj%2&>YeBQYql5L*mMQ9_6goh~3^bfP52D2dWoS(Kq1 zk6orrK`rV~kJ*@mxtNDX(SZ4A#AA3I3$PH2@C2H$7)$UZmSP!}<0&*_ z1ye1yaJ7;X3j?Xhysr!0u})6xGSSgUi_ zD#r}U)&mFHMh#uPy)pUW)nh3|rLQ6|&E<3@nuHr3c#(#5WFQk+7L(+1CYloG3fC35 z=^qXc>PNyO%8x6{~C}rI@?Yj** zieHh0Xo->(?X6R_x9&Pt-wX?`M*^%!#0|I+Nw^6%+>B(5MG9^~D#qbf+y*qEO$ z>OX59>Ul7{TMY@hNv1rXHNVwf@Lf(}@_my7Q>SdMDEhH+dg@Al`G&G>JA$VVRhy*4 zxntTFCLcNMJsLHsEAN5N7YDv*um9S(d2mRBel1s|9o3(R4$%f*ZZII zEjfJ8wo@WnC#9Z_X0El1;}khP zq#PTP^b4tWK6X+#eS|0G>cL;OYRC+(oHUnFgzI8QO9kK!PWq`w)`M0%2<;1cOh8e{$;olE*< ztSsjWU9?3z+ay^>93=jX!mDTST8ci3=}{DIG*TsoG?VsFOpYeqN5g6$T|jz*BHu{5 zokpBYT1R@EA}5A4G&3Ve%Sby(og+zCQ}j|4nJ7Xt6U&gLq-RJAN0IKLVZMfRK51xX z;z+l$vrO^2(4MLee_G=87PA}(?=zYM2c+Jj3en#kIRxnrHMMtdNOxqH(jD2jL%Z1| zp&z;0B%_li@k#Z_Ln(wI(WNwJUV7?)cE7O4_4pzfb?Qw#(|l=#=jGl;%;^ z!Fe~!k;TxSH?kUosj%2NH6@;xht@uXx?j6fi&cBCj)MOS{EU?G|cR8WWu&tIY{KsKstJSJjj{eAPbI(-B%)$Lv5&ZFyCtIojiJ zJG{a??y?cJvw{wG!L0f~ouj5|j-w(_=kxpOeC8GU23y^{>VPfVR$Et7UtDLi+e*p< zL4U0+`{4$gHsf@a`-#KDf>o8JRF(C?pl?QyUcMHv+ag!AWj8c7+6TTFy1>A9xkdTa ze4C~Fzc=pRyI1$^sg1&XZ+M!G$L%i8@{Y4;FV(<5v4Xzz>% literal 0 HcmV?d00001 diff --git a/kernel/tests/data/parsed-stats/part-00000-06d85a38-b141-479b-a315-4157335e9a11-c000.snappy.parquet b/kernel/tests/data/parsed-stats/part-00000-06d85a38-b141-479b-a315-4157335e9a11-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..cc425f75190bcebcadf8721158970e4dd124ca30 GIT binary patch literal 3103 zcmbW4ZERCj7{^b$yY`ND%5dj$FBI05t!z+A+uQYJ6c{jc;?Rk_Oc=7ew!mWd!d?^t zBuJ*9QA3Q9#SlXb0YV6ph=dQ*EQn(c#EBCT5qW2zF?_(p5JLR7PrEMRgTrS3{GR83 zUU$yZldWpbA~$FJjG1?s__ohJKbmRby*fAsE;6>yV9j8R_3CswmZf9*EF)vOu}B16 zrWp;H1OvfH5D8g?Q3MmgOvolYL3oldnvg>nLwJgiOW*{FFqR+_6v8;dc!Gs6fiRJf zN3as|2{ytcLII(WU?)r_6cLIEQwUQD4#G5olQ5m&BDe`2f=cibN(erJpAaAf38jP? zgb-mSVHROFp^PwxFqbfoP)?Xnc$)AGp@Q%%VFBSeLM34#;d#Ofget;|ghhmx2#X0z z2ulge2-Sp_3Cjs92sMP2gjWcw2(<)_P^ZhRKt+b3_$NYOsN|hFyx+h-+ID|HKfJb2 ziDk*!HlS~#)(~Cg=A)PgQXwx?0{I|66yT$Tpph?UF2%&$kOxvBFH{2gAU_mPOiCkK zMroc_S_t7x-=iKajyczS8rB zA;cqGU~ET*z~e8Hlw}my*ldx>kOGZ^#zPip0yGiIgRD?KWP>I_1yCVmhbBWsP%$(G znhH6fX^<0|4!IyVsXPt$FoN9g;I4;z5lPt@xO)(Y z-@}c;?MLG6huewd0g~DukeonJbQWFSg?kst;veB2Ltq(zy9MrbBo#lwJ&0iRIk;_b z&m#%_40ksI;XK^6a8Dy~{Q~zxB>y0(xqze_LBX%+vK{W9NUAQvJ%T{~4elnmiG#TW zcRyV7?{J&p4j>6!hPw*^y8<@?w-<@yD%>4N?jfnZhNKID^*XwAz#T+Vc?0fY1i3fi zu7`UON!cy9dk~0&aAR=$k$7*z?L=WcxJ?yRdxxmx=oMMiCCd!q(CesIB*qfW26_P% z*_a4|L7r|CQx%G#siGa;i~>n~(41N_*=&(Se$0e5m|~H-O}xXHj5{Shs7uCMWZswX zF{q}(+}fOc^q7om@_3)*w+0rqST@0mj=I_gqI*e!x`B+ivAm!xI4xj04K9{BKRvuvXPARcZA->P}FGCX@ z6s4|AwAoLLv7`@O9=eJ6W>Z&@cA4V9b17i6*l`?8x)Wci+Zx~V7CC()$-^2tl|Cby zaV)9(lf~4f##egO9sbK~wPA^e@g=1DDo)uhO2_k1GKxc%n_AX5wPp>M=}g?K^R;M8 zYkgCroT)lJPA`*-AL-G$F6KlDtZR?9Ia``ucQ!=Zw20QG$?xem+S)cWM{T9H*0z@R zaGT9xTU{TGMOtm8D>mBVm4T-E2<&9HSX1K~WR2~ym{u1b|u_n*e>UO$4PLH*yuDw1MvHJZ{)fEo7 vf~p$uMZ+bYuo_;i`NFEV&L3T^svfuM@q~lkVkQ2K8v0@H;OVczzXtyTISu~& literal 0 HcmV?d00001 diff --git a/kernel/tests/data/parsed-stats/part-00000-2d9663e0-37c0-425e-98df-2e7141f9b5fb-c000.snappy.parquet b/kernel/tests/data/parsed-stats/part-00000-2d9663e0-37c0-425e-98df-2e7141f9b5fb-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..e25b8a16c141623694f07569c9cf64c4c8621287 GIT binary patch literal 2827 zcmcJRZA?>F7{^cHT1r+Hnclm0l#O-BaI0P4fPz&~K~YgrQBgrGZF!mH#kTk|WK*Y3 zoigH#F^k!ZF=`BP``}(akYzE8BZL@Zh%w8uOy)MTIpYVH_`xju-S!qp%pxw?4Zqyy z|3BxRbNW2zo=er-l}=g6SB>(3Uhcj1XP?0$59o08vO$P*`rK3@#DGpWLnkKbggwI` z-T((U!3A#cKmim&5qO~(N?<1VU>3}VIZz67VIItfGAM@yun;O>5iEu!Pzg(687zk? zSOF_x6;#7&SOaUJ2G+rP*Z{Rq2OFUtHh~`+AOJxKK_fIlGqk{)uo=S83T@C19k2x= z5QP|Y!dBP@+hGU11zoTcy5Vhj2X?`3*aPpvUf2iw;XOD22jLLB4~O9h^kf)(ALytE zSFd)bR`;HhZ|LPuK7H_E%J}MinO>n)k5Y;@e`~1T?Qmurg$vx^fdVLmBJe`7*(f<3 z*~aYc!ubSTw1ebwnS0gXUvbkm5j^yp2nF<;2!-^U2u1Xp2wwV4gkt(V0Z(!9Xi+>4 zhb+mb^zwyN`J-D`KG*x?pe#}0R7n=;I-#VeN#c3RZvr#C09jyxYGQQYZ+mq@ zr;{p*(+#38!yrvANuMmmN>DdZzM1Kh#VG>GN1z@LLkP#LbdxxSc!;Q-jrfKjcMe_D zH~221sucGUfoU$TdJg=AD4U0Sp1?RCS3OOJ5#BP~lLVq1S3Q#k5Vi%ly@&^hnuUmd zf}9GvsK>?~MCBsf&j_Y2##IlW9}uNWaL*7JDsjWO*ASkixF-;g5Sx}Ez9qD z_Yu`qxL*-iR^aZ!y^W|?iQ7joc@=I9_a?$ujT=wQYTRbrAza5A++&D`h}yM?cw%bk zasc-(Zq+*6O9ZC%xI1yJR%PJA^WzGh57+d3>@ZbAiws%662qqq8v)2(O1Y|$xLcFF7&&*@)_#ZQ@PH|10Lb+Z*qVo(& zPZkR`+f1hRNR!>);isPVcxmlhLy>4pdz)#J+wQV^glYO?J)&E}c6Py*&QQ!AY2RjV z4aNLHf6Q+>lG0_3?dS+uORUjYq%#n++N_N&p>Qy2E!ogzRWnXgOAuS@7H)5ALbY{< z!~TXatvnX8S(77LOS-zdZKF@d2N->pDO#T9X-fBBCr<3$r+d1tTbLGWX`0a*$2|pS#h&SA Q^`#p96G!+0RPk@{Kbt!QT>t<8 literal 0 HcmV?d00001 diff --git a/kernel/tests/data/parsed-stats/part-00000-40525115-50e1-4475-aae1-c8edc59274e6-c000.snappy.parquet b/kernel/tests/data/parsed-stats/part-00000-40525115-50e1-4475-aae1-c8edc59274e6-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..d7ac169ee00421df579c130bcf11cce91aa2c420 GIT binary patch literal 2824 zcmbW(YiyHM9LMp~?V;;pr%d)dwyx7!#~eE9dfC;ALr1chnJfmCLlaJ^$Z1&t>PF z=P4zlAGgUu?$*nlT6y2P*((xEa;JtwE9-<}VJ+TAJJw-6KEMWih>iFNo3I&Muoc_z zF+Ra|?7*kkiO;YLpJO+^z#e>w4(vrI_Mr=3VL!e`H@?9Ee2atl4u|kP4&w(L!H+nK zpKuI6qX)m>I8NXsPT@4p;4IGJJbuLmT*M{(hRgUJSMUe^OwhUh(y$_Yu7#);5 z7!Jk_hLdrFk;S;d$Y$JNjJsgL-AKi7q~RWRX>oxKwjlAIRf6v5( zcB|C)s(I?lli}T}O)fC&3*Dxo6&b~+TqUWGjrC0!x2k&lx#rUJm%UZXE7z{~U)))z zmv$E`sN;#yA@Ek66=sta9PF5k45>-N3g7RSjw8@DgK zW#ALCTALoe=GBQBq0va=O5;N>56SF~A1Ye-BX}~U;<9*y`U;YgJfw}JJl|4*mvki! zosZN{%3DHmmXj`_=%pyFpx8qry^ZDo^Q22CdMQe0Q0$?RK9eDHNv}|hokhBhhUo>;=)|;;wv!H>O&VnE z5jK{qweR)U7XCIS#uyDcex*Vmn$g!|j8PGyuRHP%q&w79AH4zHkxfc>WZw?#R`-bd zp*wQbaJNa($-P=GBa9}cB~EUQ3%7d`<@1_wyJMK#70Nkie38Ds&K;OEvEEbZt9J8K zD@(&;3^7$+A?s;K>g^2|#U@S}>WOt%`1)Gfdbi&lm?o4?jnQN*PScrW$(SXyI-`2H z97C?mN>F;WX*yT`Ec8!}BGRLC)1PwVJSIYp5ki@hA}h-LWOA*jxTfIb=dI{w~789MOQ|2Udqs9z_p*B!qb=SFBPitstty6q~`l{L*!w`qn zZgmR7h-*gFPxf2c1(O?mL2ICPs1yXZZD{oQ7}U415b`;X617=b;}iz(u$O18^Cx!1r(!uEBNq z0dByLa1(xlpWzp{1-Ic>_zmvB@9+ow33uTy7=*vmRYe#q7Gc0KLK-MA225ZE6)f-& zj0G#C!#H>t#zO{7fK12&2^vfU8`xnIWP<~8;1QS%PM88Nmhk+ z3>*m`ck;<$-IySIO)_@%*ygb#kMAQ(b@KQ)OVJo=h%EAZeK}U)hX4ej2#TQuN}QK{1p-DU?Au zJPH-?7|e#pp%Ui66YwNd(XDgiy(&aj@=3SuGsyi0`R47xOU8J&Dv8VH`J2xryVaCk z>9kgb9MwDBH5V#sbDy3USvY@p!-C5liwib~>)&3!XJ7Q%p=PUcxH9YLtGQi=gWYL! zd#ax8do6PMX!}|7`u^&-243HNwd=zhV?SZt#09s_EZ-SeJMgayt{}N)Z-zk_3`*$) ziz>G3S3;&`ya@9pVmR(Bi%oRmaIq-!CC)(%r$)*;RIVqbW+LkQ7+E&dm8fTMD($Gd zFw{w?QPdMS!EDs6ICpUtJ8+I*6y%Vy5%ng{yhl(EU^phDu0g$sQ|(0Ei!pu*Y8&b) zoC+7}4h&@~>T=ZMINn^;EjV{@7Ubc4jFFp9$_CWyIJMJI_hZ-!P+vhkhf`IE`W}XL zI%+d&FHY$U)a@9;jT%Oc;kaj_cH-Q^neV|lh~b8iEHs~0v=8Xx*f^fM$? z`pMpfp$1T+sPWPSQMcmUC9=2(=LkkYF)15S+)5?SXfxJ_g|uQQ`u6ffje{xUk+jgV zNWwC;hN7XCRYHpy><)WPzUq*Qy-1i;yFOeR(Y3Yd+Mp?4tr)JtaA`DAFjSjim5uYF z2x$~T>&%gwwl!PM7WwPV4rxu!9^X@9{+*fH1~b8mVkS#fGFel8WJXT5J4f2IzW8MQ z8S$+u`d0PtlbCC-svR7Y-=C4cd7NXo?lv`b82Wxwr!lMwyPIoMVZUrze+cc6zS0(l zzU{8e;i4q#HB^{xGhr%|IzpS8I#+$Ad)1L&rcYm@M_)qgDWcH%s@9p!M(u8!t*K?1 zC)6C`JU#KF^}G~mX{~Q+w2cXP{GOn&72cE4x+3bK3s$s8+B_{yFMApyZJ}_eEo9qd zT`bE!xz$46m_08VxOp5@lG# z?Mlt)Dqr2v;T{?ppI~S#OVmHeXpZi`E}ZDy2OGVxL)e~5mT6`~i!)hQr_bZ_d;HFV zCGGXmu(PBj67bfQddmWV(&9*6k-sibw=`5-7YHsXi7X8S{JwzSUso0^wCgX`(4W}E KPe3jI2mb>dbPhfM literal 0 HcmV?d00001 diff --git a/kernel/tests/data/parsed-stats/part-00000-c0cbdedc-d11b-4e4c-b6f2-5f40c55ef515-c000.snappy.parquet b/kernel/tests/data/parsed-stats/part-00000-c0cbdedc-d11b-4e4c-b6f2-5f40c55ef515-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..4cdf6bfc7e54a7c298241054bf37c6757c3b176a GIT binary patch literal 2828 zcmdUxYiv_x7{}jsXYDC;xUrpM>pHD<%%P?Bwo7Nlp+hGQoycX0BT(AH7P||(Cg8wB9sBMa>}4=>FhZ+(Wdn2Pz;JJ#BM`wiV&htr@N#RsXFm%nk;^U z%kVq=0e`|3_zSMW-)XWW^r;IZp~KTd3P_L&1~7sQCYS&d!3=3I32uUP$biX^30a_k z3O9oVtZ)ltgAH=vR=5r9a635Q4#9ozs4lvw!0pK~3orP<4+T&NMNkX@ zt6B26bIds_gy%YViH`>#dE>!P9!P#~pj|Wml>%bpp^&`sP(uzA}PY86ROE5{a?3zaAth73Xzq#vt1A9ltVgY zPVySXf6E8pp?_%2wPJ9}<)VO4Kk^^a|f^EX6lx7F-B z5c~OPqggs$nsxHI-2UVKfs~o2%jONf5dHjQ>zBqCzb)T1w6^!B{twSj{Fr%%kGM@n z>)UW;=KBX^xCU38$H!>N23 z^$><-J?d)IuW`ycQTJe&H=s76p1~=81$8Ti=t7O4_Te~RMeV}5h*Px@=Lm-VHB#1~ zp2ex?Mm>m;^*ZVb)Gu&KH=*vrkl#R!p?-qn??LUsxq`F!O`H=L`I|{uk9q-T?iSRK zFl<{<+fl#ADSr!fKSugC)E3mwaDs26ZpV9F!g23F?Z&vg!;~&n?=;E7m3fDp z4sy;Bs@z4+Aq>mAsH;)qmFY#@gKFN5+K74vr+5$QRt&KhHGw<7>OGtz z820_7tU*1?Iau+&F21D>Chpb@z11S~rHWseBL}43stPe2P9B1U)DEMvE@=;CC}(vEeIQfmkLx^CU2bk}3^hHys5x8{ ztqbwFRir6;i@r7@6f=gTEpLjburXvjk`h`L9kvY3p;)MCrBM5HR-3gVk27Dg76^lE z)gD*$=<4%nYP&H{4vtn~v^1*7AF0i#%EEa~gu+)v?aEP9wI^GdF7i5zHpQG6J-(*l z@popb>x=}0g$$OgWTK{gW5y2Fo1<9N!T4Z98S$k`+ETUOGd$LQSv``PHg3RrSTBp#ZbCh(&D3qvmOYR;T9ib+Mheq!Bur(JEwsqVzG^dF ztB(CKgW43QwJFro1r$0@R=cuUsMTq)G&C)9g&ITj(-m*6>zQa%b8SPtCDrHhy8Oa2 z^_q<4T1gBgl!gSVR3Xy!zHyEVLv>l*gh@M{-0*$*Q(C=g5=s0Y;u>irO1_n%4c- ziHG;@jXk}vO;~0n$~3L6$)2dI-Q)6jU0!?slGfT-#9mYs^|`~v?tsr%To?@(c*DN% l(okX8=U-A3UF!3BJwC5D9Pm%IYA@BupXlTRP|5$n{{SpC2o(ST literal 0 HcmV?d00001 diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index ef1372713e..bfce625fa4 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -1701,3 +1701,91 @@ async fn test_invalid_files_are_skipped() -> Result<(), Box Result<(), Box> { + let _ = tracing_subscriber::fmt::try_init(); + let path = std::fs::canonicalize(PathBuf::from("./tests/data/parsed-stats/"))?; + let url = url::Url::from_directory_path(path).unwrap(); + let engine = test_utils::create_default_engine(&url)?; + + let snapshot = Snapshot::builder_for(url).build(engine.as_ref())?; + + // Test 1: Predicate that should skip all files (id > 700) + // All files have max id of 600, so no files should match + let predicate = Pred::gt(column_expr!("id"), Expr::literal(700i64)); + let scan = snapshot + .clone() + .scan_builder() + .with_predicate(Arc::new(predicate)) + .build()?; + + let files_scanned: usize = scan.execute(engine.clone())?.count(); + assert_eq!( + files_scanned, 0, + "Expected 0 files when id > 700 (all files have max id 600)" + ); + + // Test 2: Predicate that should return only first file (id < 50) + // Only file 1 has ids 1-100 and min < 50 + let predicate = Pred::lt(column_expr!("id"), Expr::literal(50i64)); + let scan = snapshot + .clone() + .scan_builder() + .with_predicate(Arc::new(predicate)) + .build()?; + + let mut files_scanned = 0; + for _data in scan.execute(engine.clone())? { + files_scanned += 1; + } + assert_eq!( + files_scanned, 1, + "Expected 1 file when id < 50 (only file 1 has min id 1)" + ); + + // Test 3: Predicate using salary column (salary > 105000) + // Only file 6 has salary range 100100-110000, with max 110000 + let predicate = Pred::gt(column_expr!("salary"), Expr::literal(105000i64)); + let scan = snapshot + .clone() + .scan_builder() + .with_predicate(Arc::new(predicate)) + .build()?; + + let mut files_scanned = 0; + for _data in scan.execute(engine.clone())? { + files_scanned += 1; + } + assert_eq!( + files_scanned, 1, + "Expected 1 file when salary > 105000 (only file 6 has salary up to 110000)" + ); + + // Test 4: Predicate that matches multiple files (id > 350) + // Files 4, 5, 6 have ids starting from 301, 401, 501 + let predicate = Pred::gt(column_expr!("id"), Expr::literal(350i64)); + let scan = snapshot + .scan_builder() + .with_predicate(Arc::new(predicate)) + .build()?; + + let mut files_scanned = 0; + for _data in scan.execute(engine)? { + files_scanned += 1; + } + assert_eq!( + files_scanned, 3, + "Expected 3 files when id > 350 (files 4, 5, 6 have ids > 350)" + ); + + Ok(()) +}