diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index 430f10a30e..29781b7eb0 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -216,7 +216,12 @@ impl DefaultEngine { )?; let physical_data = logical_to_physical_expr.evaluate(data)?; self.parquet - .write_parquet_file(write_context.target_dir(), physical_data, partition_values) + .write_parquet_file( + write_context.target_dir(), + physical_data, + partition_values, + Some(write_context.stats_columns()), + ) .await } } diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index 7ed225ddb3..cb034b91b7 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -201,6 +201,7 @@ impl DefaultParquetHandler { path: &url::Url, data: Box, partition_values: HashMap, + _stats_columns: Option<&[String]>, ) -> DeltaResult> { let parquet_metadata = self.write_parquet(path, data).await?; parquet_metadata.as_record_batch(&partition_values) @@ -294,6 +295,7 @@ impl ParquetHandler for DefaultParquetHandler { /// - `location` - The full URL path where the Parquet file should be written /// (e.g., `s3://bucket/path/file.parquet`, `file:///path/to/file.parquet`). /// - `data` - An iterator of engine data to be written to the Parquet file. + /// - `stats_columns` - Optional column names for which statistics should be collected. /// /// # Returns /// @@ -302,6 +304,7 @@ impl ParquetHandler for DefaultParquetHandler { &self, location: url::Url, mut data: Box>> + Send>, + _stats_columns: Option<&[String]>, ) -> DeltaResult<()> { let store = self.store.clone(); @@ -776,7 +779,7 @@ mod tests { // Test writing through the trait method let file_url = Url::parse("memory:///test/data.parquet").unwrap(); parquet_handler - .write_parquet_file(file_url.clone(), data_iter) + .write_parquet_file(file_url.clone(), data_iter, None) .unwrap(); // Verify we can read the file back @@ -964,7 +967,7 @@ mod tests { // Write the data let file_url = Url::parse("memory:///roundtrip/test.parquet").unwrap(); parquet_handler - .write_parquet_file(file_url.clone(), data_iter) + .write_parquet_file(file_url.clone(), data_iter, None) .unwrap(); // Read it back @@ -1152,7 +1155,7 @@ mod tests { // Write the first file parquet_handler - .write_parquet_file(file_url.clone(), data_iter1) + .write_parquet_file(file_url.clone(), data_iter1, None) .unwrap(); // Create second data set with different data @@ -1168,7 +1171,7 @@ mod tests { // Overwrite with second file (overwrite=true) parquet_handler - .write_parquet_file(file_url.clone(), data_iter2) + .write_parquet_file(file_url.clone(), data_iter2, None) .unwrap(); // Read back and verify it contains the second data set @@ -1231,7 +1234,7 @@ mod tests { // Write the first file parquet_handler - .write_parquet_file(file_url.clone(), data_iter1) + .write_parquet_file(file_url.clone(), data_iter1, None) .unwrap(); // Create second data set @@ -1247,7 +1250,7 @@ mod tests { // Write again - should overwrite successfully (new behavior always overwrites) parquet_handler - .write_parquet_file(file_url.clone(), data_iter2) + .write_parquet_file(file_url.clone(), data_iter2, None) .unwrap(); // Verify the file was overwritten with the new data diff --git a/kernel/src/engine/sync/parquet.rs b/kernel/src/engine/sync/parquet.rs index 557ae9b226..1c0db5d72a 100644 --- a/kernel/src/engine/sync/parquet.rs +++ b/kernel/src/engine/sync/parquet.rs @@ -79,6 +79,7 @@ impl ParquetHandler for SyncParquetHandler { /// - `location` - The full URL path where the Parquet file should be written /// (e.g., `file:///path/to/file.parquet`). /// - `data` - An iterator of engine data to be written to the Parquet file. + /// - `stats_columns` - Optional column names for which statistics should be collected. /// /// # Returns /// @@ -87,6 +88,7 @@ impl ParquetHandler for SyncParquetHandler { &self, location: Url, mut data: Box>> + Send>, + _stats_columns: Option<&[String]>, ) -> DeltaResult<()> { // Convert URL to file path let path = location @@ -115,6 +117,7 @@ impl ParquetHandler for SyncParquetHandler { writer.close()?; // writer must be closed to write footer + // TODO: Implement stats collection for SyncEngine Ok(()) } @@ -174,7 +177,9 @@ mod tests { > = Box::new(std::iter::once(Ok(engine_data))); // Write the file - handler.write_parquet_file(url.clone(), data_iter).unwrap(); + handler + .write_parquet_file(url.clone(), data_iter, None) + .unwrap(); // Verify the file exists assert!(file_path.exists()); @@ -295,7 +300,9 @@ mod tests { > = Box::new(std::iter::once(Ok(engine_data))); // Write the file - handler.write_parquet_file(url.clone(), data_iter).unwrap(); + handler + .write_parquet_file(url.clone(), data_iter, None) + .unwrap(); // Verify the file exists assert!(file_path.exists()); @@ -370,7 +377,9 @@ mod tests { > = Box::new(std::iter::once(Ok(engine_data1))); // Write the first file - handler.write_parquet_file(url.clone(), data_iter1).unwrap(); + handler + .write_parquet_file(url.clone(), data_iter1, None) + .unwrap(); assert!(file_path.exists()); // Create second data set with different data @@ -386,7 +395,9 @@ mod tests { > = Box::new(std::iter::once(Ok(engine_data2))); // Overwrite with second file (overwrite=true) - handler.write_parquet_file(url.clone(), data_iter2).unwrap(); + handler + .write_parquet_file(url.clone(), data_iter2, None) + .unwrap(); // Read back and verify it contains the second data set let file = File::open(&file_path).unwrap(); @@ -445,7 +456,9 @@ mod tests { > = Box::new(std::iter::once(Ok(engine_data1))); // Write the first file - handler.write_parquet_file(url.clone(), data_iter1).unwrap(); + handler + .write_parquet_file(url.clone(), data_iter1, None) + .unwrap(); assert!(file_path.exists()); // Create second data set @@ -461,7 +474,9 @@ mod tests { > = Box::new(std::iter::once(Ok(engine_data2))); // Write again - should overwrite successfully (new behavior always overwrites) - handler.write_parquet_file(url.clone(), data_iter2).unwrap(); + handler + .write_parquet_file(url.clone(), data_iter2, None) + .unwrap(); // Verify the file was overwritten with the new data let file = File::open(&file_path).unwrap(); @@ -537,7 +552,9 @@ mod tests { > = Box::new(batches.into_iter()); // Write the file - handler.write_parquet_file(url.clone(), data_iter).unwrap(); + handler + .write_parquet_file(url.clone(), data_iter, None) + .unwrap(); // Verify the file exists assert!(file_path.exists()); diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index d27a1c915a..72a0f8dd0a 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -778,9 +778,10 @@ pub trait ParquetHandler: AsAny { predicate: Option, ) -> DeltaResult; - /// Write data to a Parquet file at the specified URL. + /// Write data to a Parquet file at the specified URL, collecting statistics. /// - /// This method writes the provided `data` to a Parquet file at the given `url`. + /// This method writes the provided `data` to a Parquet file at the given `url`, + /// and collects statistics (min, max, null count) for the specified columns. /// /// This will overwrite the file if it already exists. /// @@ -789,6 +790,7 @@ pub trait ParquetHandler: AsAny { /// - `url` - The full URL path where the Parquet file should be written /// (e.g., `s3://bucket/path/file.parquet`). /// - `data` - An iterator of engine data to be written to the Parquet file. + /// - `stats_columns` - Optional column names for which statistics should be collected. /// /// # Returns /// @@ -797,6 +799,7 @@ pub trait ParquetHandler: AsAny { &self, location: url::Url, data: Box>> + Send>, + stats_columns: Option<&[String]>, ) -> DeltaResult<()>; /// Read the footer metadata from a Parquet file without reading the data. diff --git a/kernel/src/scan/data_skipping/stats_schema.rs b/kernel/src/scan/data_skipping/stats_schema.rs index e18423b467..ff4fde8b69 100644 --- a/kernel/src/scan/data_skipping/stats_schema.rs +++ b/kernel/src/scan/data_skipping/stats_schema.rs @@ -133,6 +133,120 @@ pub(crate) fn expected_stats_schema( StructType::try_new(fields) } +/// Returns the list of column names that should have statistics collected. +/// +/// This extracts just the column names without building the full stats schema, +/// making it more efficient when only the column list is needed. +#[allow(unused)] +pub(crate) fn stats_column_names( + physical_file_schema: &Schema, + table_properties: &TableProperties, +) -> Vec { + let mut filter = StatsColumnFilter::new(table_properties); + let mut columns = Vec::new(); + filter.collect_columns(physical_file_schema, &mut columns); + columns +} + +/// Handles column filtering logic for statistics based on table properties. +/// +/// Filters columns according to: +/// * `dataSkippingStatsColumns` - explicit list of columns to include (takes precedence) +/// * `dataSkippingNumIndexedCols` - number of leaf columns to include (default 32) +struct StatsColumnFilter { + n_columns: Option, + added_columns: u64, + column_names: Option>, + path: Vec, +} + +impl StatsColumnFilter { + fn new(props: &TableProperties) -> Self { + // If data_skipping_stats_columns is specified, it takes precedence + // over data_skipping_num_indexed_cols, even if that is also specified. + if let Some(column_names) = &props.data_skipping_stats_columns { + Self { + n_columns: None, + added_columns: 0, + column_names: Some(column_names.clone()), + path: Vec::new(), + } + } else { + let n_cols = props + .data_skipping_num_indexed_cols + .unwrap_or(DataSkippingNumIndexedCols::NumColumns(32)); + Self { + n_columns: Some(n_cols), + added_columns: 0, + column_names: None, + path: Vec::new(), + } + } + } + + /// Collects column names that should have statistics. + fn collect_columns(&mut self, schema: &Schema, result: &mut Vec) { + for field in schema.fields() { + self.collect_field(field, result); + } + } + + fn collect_field(&mut self, field: &StructField, result: &mut Vec) { + if self.at_column_limit() { + return; + } + + self.path.push(field.name.clone()); + + match field.data_type() { + DataType::Struct(struct_type) => { + for child in struct_type.fields() { + self.collect_field(child, result); + } + } + _ => { + if self.should_include_current() { + result.push(ColumnName::new(&self.path)); + self.added_columns += 1; + } + } + } + + self.path.pop(); + } + + /// Returns true if the column limit has been reached. + fn at_column_limit(&self) -> bool { + matches!( + self.n_columns, + Some(DataSkippingNumIndexedCols::NumColumns(n)) if self.added_columns >= n + ) + } + + /// Returns true if the current path should be included based on column_names config. + fn should_include_current(&self) -> bool { + self.column_names + .as_ref() + .map(|ns| should_include_column(&ColumnName::new(&self.path), ns)) + .unwrap_or(true) + } + + /// Enters a field path for filtering decisions. + fn enter_field(&mut self, name: &str) { + self.path.push(name.to_string()); + } + + /// Exits the current field path. + fn exit_field(&mut self) { + self.path.pop(); + } + + /// Records that a leaf column was included. + fn record_included(&mut self) { + self.added_columns += 1; + } +} + /// Transforms a schema to make all fields nullable. /// Used for stats schemas where stats may not be available for all columns. pub(crate) struct NullableStatsTransform; @@ -178,44 +292,19 @@ impl<'a> SchemaTransform<'a> for NullCountStatsTransform { /// Base stats schema in this case refers the subsets of fields in the table schema /// that may be considered for stats collection. Depending on the type of stats - min/max/nullcount/... - /// additional transformations may be applied. +/// Transforms a schema to filter columns for statistics based on table properties. /// -/// The concrete shape of the schema depends on the table configuration. -/// * `dataSkippingStatsColumns` - used to explicitly specify the columns -/// to be used for data skipping statistics. (takes precedence) -/// * `dataSkippingNumIndexedCols` - used to specify the number of columns -/// to be used for data skipping statistics. Defaults to 32. -/// -/// All fields are nullable. +/// All fields in the output are nullable. #[allow(unused)] struct BaseStatsTransform { - n_columns: Option, - added_columns: u64, - column_names: Option>, - path: Vec, + filter: StatsColumnFilter, } impl BaseStatsTransform { #[allow(unused)] fn new(props: &TableProperties) -> Self { - // If data_skipping_stats_columns is specified, it takes precedence - // over data_skipping_num_indexed_cols, even if that is also specified. - if let Some(column_names) = &props.data_skipping_stats_columns { - Self { - n_columns: None, - added_columns: 0, - column_names: Some(column_names.clone()), - path: Vec::new(), - } - } else { - let n_cols = props - .data_skipping_num_indexed_cols - .unwrap_or(DataSkippingNumIndexedCols::NumColumns(32)); - Self { - n_columns: Some(n_cols), - added_columns: 0, - column_names: None, - path: Vec::new(), - } + Self { + filter: StatsColumnFilter::new(props), } } } @@ -224,34 +313,22 @@ impl<'a> SchemaTransform<'a> for BaseStatsTransform { fn transform_struct_field(&mut self, field: &'a StructField) -> Option> { use Cow::*; - // Check if the number of columns is set and if the added columns exceed the limit - // In the constructor we assert this will always be None if column_names are specified - if let Some(DataSkippingNumIndexedCols::NumColumns(n_cols)) = self.n_columns { - if self.added_columns >= n_cols { - return None; - } + if self.filter.at_column_limit() { + return None; } - self.path.push(field.name.clone()); + self.filter.enter_field(field.name()); let data_type = field.data_type(); // We always traverse struct fields (they don't count against the column limit), // but we only include leaf fields if they qualify based on column_names config. // When column_names is None, all leaf fields are included (up to n_columns limit). if !matches!(data_type, DataType::Struct(_)) { - let should_include = self - .column_names - .as_ref() - .map(|ns| should_include_column(&ColumnName::new(&self.path), ns)) - .unwrap_or(true); - - if !should_include { - self.path.pop(); + if !self.filter.should_include_current() { + self.filter.exit_field(); return None; } - - // Increment count only for leaf columns - self.added_columns += 1; + self.filter.record_included(); } let field = match self.transform(&field.data_type)? { @@ -264,7 +341,7 @@ impl<'a> SchemaTransform<'a> for BaseStatsTransform { }), }; - self.path.pop(); + self.filter.exit_field(); // exclude struct fields with no children if matches!(field.data_type(), DataType::Struct(dt) if dt.fields().len() == 0) { diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 195b64f31d..6bb9669a74 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -441,9 +441,11 @@ impl Snapshot { let data_iter = writer.checkpoint_data(engine)?; let state = data_iter.state(); let lazy_data = data_iter.map(|r| r.and_then(|f| f.apply_selection_vector())); - engine - .parquet_handler() - .write_parquet_file(checkpoint_path.clone(), Box::new(lazy_data))?; + engine.parquet_handler().write_parquet_file( + checkpoint_path.clone(), + Box::new(lazy_data), + None, + )?; let file_meta = engine.storage_handler().head(&checkpoint_path)?; diff --git a/kernel/src/table_configuration.rs b/kernel/src/table_configuration.rs index 9080447084..cbbc8f6738 100644 --- a/kernel/src/table_configuration.rs +++ b/kernel/src/table_configuration.rs @@ -13,7 +13,8 @@ use std::sync::Arc; use url::Url; use crate::actions::{Metadata, Protocol}; -use crate::scan::data_skipping::stats_schema::expected_stats_schema; +use crate::expressions::ColumnName; +use crate::scan::data_skipping::stats_schema::{expected_stats_schema, stats_column_names}; use crate::schema::variant_utils::validate_variant_type_feature_support; use crate::schema::{InvariantChecker, SchemaRef, StructType}; use crate::table_features::{ @@ -181,21 +182,38 @@ impl TableConfiguration { #[allow(unused)] #[internal_api] pub(crate) fn expected_stats_schema(&self) -> DeltaResult { + let physical_schema = self.physical_data_schema(); + Ok(Arc::new(expected_stats_schema( + &physical_schema, + self.table_properties(), + )?)) + } + + /// Returns the list of column names that should have statistics collected. + /// + /// This returns the leaf column paths as a flat list of column names + /// (e.g., `["id", "nested.field"]`). + #[allow(unused)] + #[internal_api] + pub(crate) fn stats_column_names(&self) -> Vec { + let physical_schema = self.physical_data_schema(); + stats_column_names(&physical_schema, self.table_properties()) + } + + /// Returns the physical schema for data columns (excludes partition columns). + /// + /// Partition columns are excluded because statistics are only collected for data columns + /// that are physically stored in the parquet files. Partition values are stored in the + /// file path, not in the file content, so they don't have file-level statistics. + fn physical_data_schema(&self) -> StructType { let partition_columns = self.metadata().partition_columns(); let column_mapping_mode = self.column_mapping_mode(); - // Partition columns are excluded because statistics are only collected for data columns - // that are physically stored in the parquet files. Partition values are stored in the - // file path, not in the file content, so they don't have file-level statistics. - let physical_schema = StructType::try_new( + StructType::new_unchecked( self.schema() .fields() .filter(|field| !partition_columns.contains(field.name())) .map(|field| field.make_physical(column_mapping_mode)), - )?; - Ok(Arc::new(expected_stats_schema( - &physical_schema, - self.table_properties(), - )?)) + ) } /// The [`Metadata`] for this table at this version. diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index a3041b1893..77be83585b 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -790,6 +790,45 @@ impl Transaction { &BASE_ADD_FILES_SCHEMA } + /// Returns the expected schema for file statistics. + /// + /// The schema structure is derived from table configuration: + /// - `delta.dataSkippingStatsColumns`: Explicit column list (if set) + /// - `delta.dataSkippingNumIndexedCols`: Column count limit (default 32) + /// - Partition columns: Always excluded + /// + /// The returned schema has the following structure: + /// ```ignore + /// { + /// numRecords: long, + /// nullCount: { ... }, // Nested struct mirroring data schema, all fields LONG + /// minValues: { ... }, // Nested struct, only min/max eligible types + /// maxValues: { ... }, // Nested struct, only min/max eligible types + /// tightBounds: boolean, + /// } + /// ``` + /// + /// Engines should collect statistics matching this schema structure when writing files. + #[allow(unused)] + pub fn stats_schema(&self) -> DeltaResult { + self.read_snapshot + .table_configuration() + .expected_stats_schema() + } + + /// Returns the list of column names that should have statistics collected. + /// + /// This returns the leaf column paths as a flat list of column names + /// (e.g., `["id", "nested.field"]`). + /// + /// Engines can use this to determine which columns need stats during writes. + #[allow(unused)] + pub fn stats_columns(&self) -> Vec { + self.read_snapshot + .table_configuration() + .stats_column_names() + } + // Generate the logical-to-physical transform expression which must be evaluated on every data // chunk before writing. At the moment, this is a transaction-wide expression. fn generate_logical_to_physical(&self) -> Expression { @@ -820,6 +859,8 @@ impl Transaction { // Note: after we introduce metadata updates (modify table schema, etc.), we need to make sure // that engines cannot call this method after a metadata change, since the write context could // have invalid metadata. + // Note: Callers that use get_write_context may be writing data to the table and they might + // have invalid metadata. pub fn get_write_context(&self) -> WriteContext { let target_dir = self.read_snapshot.table_root(); let snapshot_schema = self.read_snapshot.schema(); @@ -837,11 +878,19 @@ impl Transaction { .cloned(); let physical_schema = Arc::new(StructType::new_unchecked(physical_fields)); + // Get stats columns from table configuration + let stats_columns = self + .stats_columns() + .into_iter() + .map(|c| c.to_string()) + .collect(); + WriteContext::new( target_dir.clone(), snapshot_schema, physical_schema, Arc::new(logical_to_physical), + stats_columns, ) } @@ -1313,6 +1362,8 @@ pub struct WriteContext { logical_schema: SchemaRef, physical_schema: SchemaRef, logical_to_physical: ExpressionRef, + /// Column names that should have statistics collected during writes. + stats_columns: Vec, } impl WriteContext { @@ -1321,12 +1372,14 @@ impl WriteContext { logical_schema: SchemaRef, physical_schema: SchemaRef, logical_to_physical: ExpressionRef, + stats_columns: Vec, ) -> Self { WriteContext { target_dir, logical_schema, physical_schema, logical_to_physical, + stats_columns, } } @@ -1346,6 +1399,13 @@ impl WriteContext { self.logical_to_physical.clone() } + /// Returns the column names that should have statistics collected during writes. + /// + /// Based on table configuration (dataSkippingNumIndexedCols, dataSkippingStatsColumns). + pub fn stats_columns(&self) -> &[String] { + &self.stats_columns + } + /// Generate a new unique absolute URL for a deletion vector file. /// /// This method generates a unique file name in the table directory. diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index f47db4b92e..8212236889 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -1078,6 +1078,7 @@ async fn test_append_variant() -> Result<(), Box> { write_context.target_dir(), Box::new(ArrowEngineData::new(data.clone())), HashMap::new(), + Some(write_context.stats_columns()), ) .await?; @@ -1251,6 +1252,7 @@ async fn test_shredded_variant_read_rejection() -> Result<(), Box