Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 126 additions & 49 deletions kernel/src/scan/data_skipping/stats_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,120 @@ pub(crate) fn expected_stats_schema(
StructType::try_new(fields)
}

/// Returns the list of column names that should have statistics collected.
///
/// This extracts just the column names without building the full stats schema,
/// making it more efficient when only the column list is needed.
#[allow(unused)]
pub(crate) fn stats_column_names(
physical_file_schema: &Schema,
table_properties: &TableProperties,
) -> Vec<ColumnName> {
let mut filter = StatsColumnFilter::new(table_properties);
let mut columns = Vec::new();
filter.collect_columns(physical_file_schema, &mut columns);
columns
}

/// Handles column filtering logic for statistics based on table properties.
///
/// Filters columns according to:
/// * `dataSkippingStatsColumns` - explicit list of columns to include (takes precedence)
/// * `dataSkippingNumIndexedCols` - number of leaf columns to include (default 32)
struct StatsColumnFilter {
n_columns: Option<DataSkippingNumIndexedCols>,
added_columns: u64,
column_names: Option<Vec<ColumnName>>,
path: Vec<String>,
}

impl StatsColumnFilter {
fn new(props: &TableProperties) -> Self {
// If data_skipping_stats_columns is specified, it takes precedence
// over data_skipping_num_indexed_cols, even if that is also specified.
if let Some(column_names) = &props.data_skipping_stats_columns {
Self {
n_columns: None,
added_columns: 0,
column_names: Some(column_names.clone()),
path: Vec::new(),
}
} else {
let n_cols = props
.data_skipping_num_indexed_cols
.unwrap_or(DataSkippingNumIndexedCols::NumColumns(32));
Self {
n_columns: Some(n_cols),
added_columns: 0,
column_names: None,
path: Vec::new(),
}
}
}

/// Collects column names that should have statistics.
fn collect_columns(&mut self, schema: &Schema, result: &mut Vec<ColumnName>) {
for field in schema.fields() {
self.collect_field(field, result);
}
}

fn collect_field(&mut self, field: &StructField, result: &mut Vec<ColumnName>) {
if self.at_column_limit() {
return;
}

self.path.push(field.name.clone());

match field.data_type() {
DataType::Struct(struct_type) => {
for child in struct_type.fields() {
self.collect_field(child, result);
}
}
_ => {
if self.should_include_current() {
result.push(ColumnName::new(&self.path));
self.added_columns += 1;
}
}
}

self.path.pop();
}

/// Returns true if the column limit has been reached.
fn at_column_limit(&self) -> bool {
matches!(
self.n_columns,
Some(DataSkippingNumIndexedCols::NumColumns(n)) if self.added_columns >= n
)
}

/// Returns true if the current path should be included based on column_names config.
fn should_include_current(&self) -> bool {
self.column_names
.as_ref()
.map(|ns| should_include_column(&ColumnName::new(&self.path), ns))
.unwrap_or(true)
}

/// Enters a field path for filtering decisions.
fn enter_field(&mut self, name: &str) {
self.path.push(name.to_string());
}

/// Exits the current field path.
fn exit_field(&mut self) {
self.path.pop();
}

/// Records that a leaf column was included.
fn record_included(&mut self) {
self.added_columns += 1;
}
}

/// Transforms a schema to make all fields nullable.
/// Used for stats schemas where stats may not be available for all columns.
pub(crate) struct NullableStatsTransform;
Expand Down Expand Up @@ -178,44 +292,19 @@ impl<'a> SchemaTransform<'a> for NullCountStatsTransform {
/// Base stats schema in this case refers the subsets of fields in the table schema
/// that may be considered for stats collection. Depending on the type of stats - min/max/nullcount/... -
/// additional transformations may be applied.
/// Transforms a schema to filter columns for statistics based on table properties.
///
/// The concrete shape of the schema depends on the table configuration.
/// * `dataSkippingStatsColumns` - used to explicitly specify the columns
/// to be used for data skipping statistics. (takes precedence)
/// * `dataSkippingNumIndexedCols` - used to specify the number of columns
/// to be used for data skipping statistics. Defaults to 32.
///
/// All fields are nullable.
/// All fields in the output are nullable.
#[allow(unused)]
struct BaseStatsTransform {
n_columns: Option<DataSkippingNumIndexedCols>,
added_columns: u64,
column_names: Option<Vec<ColumnName>>,
path: Vec<String>,
filter: StatsColumnFilter,
}

impl BaseStatsTransform {
#[allow(unused)]
fn new(props: &TableProperties) -> Self {
// If data_skipping_stats_columns is specified, it takes precedence
// over data_skipping_num_indexed_cols, even if that is also specified.
if let Some(column_names) = &props.data_skipping_stats_columns {
Self {
n_columns: None,
added_columns: 0,
column_names: Some(column_names.clone()),
path: Vec::new(),
}
} else {
let n_cols = props
.data_skipping_num_indexed_cols
.unwrap_or(DataSkippingNumIndexedCols::NumColumns(32));
Self {
n_columns: Some(n_cols),
added_columns: 0,
column_names: None,
path: Vec::new(),
}
Self {
filter: StatsColumnFilter::new(props),
}
}
}
Expand All @@ -224,34 +313,22 @@ impl<'a> SchemaTransform<'a> for BaseStatsTransform {
fn transform_struct_field(&mut self, field: &'a StructField) -> Option<Cow<'a, StructField>> {
use Cow::*;

// Check if the number of columns is set and if the added columns exceed the limit
// In the constructor we assert this will always be None if column_names are specified
if let Some(DataSkippingNumIndexedCols::NumColumns(n_cols)) = self.n_columns {
if self.added_columns >= n_cols {
return None;
}
if self.filter.at_column_limit() {
return None;
}

self.path.push(field.name.clone());
self.filter.enter_field(field.name());
let data_type = field.data_type();

// We always traverse struct fields (they don't count against the column limit),
// but we only include leaf fields if they qualify based on column_names config.
// When column_names is None, all leaf fields are included (up to n_columns limit).
if !matches!(data_type, DataType::Struct(_)) {
let should_include = self
.column_names
.as_ref()
.map(|ns| should_include_column(&ColumnName::new(&self.path), ns))
.unwrap_or(true);

if !should_include {
self.path.pop();
if !self.filter.should_include_current() {
self.filter.exit_field();
return None;
}

// Increment count only for leaf columns
self.added_columns += 1;
self.filter.record_included();
}

let field = match self.transform(&field.data_type)? {
Expand All @@ -264,7 +341,7 @@ impl<'a> SchemaTransform<'a> for BaseStatsTransform {
}),
};

self.path.pop();
self.filter.exit_field();

// exclude struct fields with no children
if matches!(field.data_type(), DataType::Struct(dt) if dt.fields().len() == 0) {
Expand Down
38 changes: 28 additions & 10 deletions kernel/src/table_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use std::sync::Arc;
use url::Url;

use crate::actions::{Metadata, Protocol};
use crate::scan::data_skipping::stats_schema::expected_stats_schema;
use crate::expressions::ColumnName;
use crate::scan::data_skipping::stats_schema::{expected_stats_schema, stats_column_names};
use crate::schema::variant_utils::validate_variant_type_feature_support;
use crate::schema::{InvariantChecker, SchemaRef, StructType};
use crate::table_features::{
Expand Down Expand Up @@ -181,21 +182,38 @@ impl TableConfiguration {
#[allow(unused)]
#[internal_api]
pub(crate) fn expected_stats_schema(&self) -> DeltaResult<SchemaRef> {
let physical_schema = self.physical_data_schema();
Ok(Arc::new(expected_stats_schema(
&physical_schema,
self.table_properties(),
)?))
}

/// Returns the list of column names that should have statistics collected.
///
/// This returns the leaf column paths as a flat list of column names
/// (e.g., `["id", "nested.field"]`).
#[allow(unused)]
#[internal_api]
pub(crate) fn stats_column_names(&self) -> Vec<ColumnName> {
let physical_schema = self.physical_data_schema();
stats_column_names(&physical_schema, self.table_properties())
}

/// Returns the physical schema for data columns (excludes partition columns).
///
/// Partition columns are excluded because statistics are only collected for data columns
/// that are physically stored in the parquet files. Partition values are stored in the
/// file path, not in the file content, so they don't have file-level statistics.
fn physical_data_schema(&self) -> StructType {
let partition_columns = self.metadata().partition_columns();
let column_mapping_mode = self.column_mapping_mode();
// Partition columns are excluded because statistics are only collected for data columns
// that are physically stored in the parquet files. Partition values are stored in the
// file path, not in the file content, so they don't have file-level statistics.
let physical_schema = StructType::try_new(
StructType::new_unchecked(
self.schema()
.fields()
.filter(|field| !partition_columns.contains(field.name()))
.map(|field| field.make_physical(column_mapping_mode)),
)?;
Ok(Arc::new(expected_stats_schema(
&physical_schema,
self.table_properties(),
)?))
)
}

/// The [`Metadata`] for this table at this version.
Expand Down
60 changes: 60 additions & 0 deletions kernel/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,45 @@ impl Transaction {
&BASE_ADD_FILES_SCHEMA
}

/// Returns the expected schema for file statistics.
///
/// The schema structure is derived from table configuration:
/// - `delta.dataSkippingStatsColumns`: Explicit column list (if set)
/// - `delta.dataSkippingNumIndexedCols`: Column count limit (default 32)
/// - Partition columns: Always excluded
///
/// The returned schema has the following structure:
/// ```ignore
/// {
/// numRecords: long,
/// nullCount: { ... }, // Nested struct mirroring data schema, all fields LONG
/// minValues: { ... }, // Nested struct, only min/max eligible types
/// maxValues: { ... }, // Nested struct, only min/max eligible types
/// tightBounds: boolean,
/// }
/// ```
///
/// Engines should collect statistics matching this schema structure when writing files.
#[allow(unused)]
pub fn stats_schema(&self) -> DeltaResult<SchemaRef> {
self.read_snapshot
.table_configuration()
.expected_stats_schema()
}

/// Returns the list of column names that should have statistics collected.
///
/// This returns the leaf column paths as a flat list of column names
/// (e.g., `["id", "nested.field"]`).
///
/// Engines can use this to determine which columns need stats during writes.
#[allow(unused)]
pub fn stats_columns(&self) -> Vec<ColumnName> {
self.read_snapshot
.table_configuration()
.stats_column_names()
}

// Generate the logical-to-physical transform expression which must be evaluated on every data
// chunk before writing. At the moment, this is a transaction-wide expression.
fn generate_logical_to_physical(&self) -> Expression {
Expand Down Expand Up @@ -820,6 +859,8 @@ impl Transaction {
// Note: after we introduce metadata updates (modify table schema, etc.), we need to make sure
// that engines cannot call this method after a metadata change, since the write context could
// have invalid metadata.
// Note: Callers that use get_write_context may be writing data to the table and they might
// have invalid metadata.
pub fn get_write_context(&self) -> WriteContext {
let target_dir = self.read_snapshot.table_root();
let snapshot_schema = self.read_snapshot.schema();
Expand All @@ -837,11 +878,19 @@ impl Transaction {
.cloned();
let physical_schema = Arc::new(StructType::new_unchecked(physical_fields));

// Get stats columns from table configuration
let stats_columns = self
.stats_columns()
.into_iter()
.map(|c| c.to_string())
.collect();

WriteContext::new(
target_dir.clone(),
snapshot_schema,
physical_schema,
Arc::new(logical_to_physical),
stats_columns,
)
}

Expand Down Expand Up @@ -1313,6 +1362,8 @@ pub struct WriteContext {
logical_schema: SchemaRef,
physical_schema: SchemaRef,
logical_to_physical: ExpressionRef,
/// Column names that should have statistics collected during writes.
stats_columns: Vec<String>,
}

impl WriteContext {
Expand All @@ -1321,12 +1372,14 @@ impl WriteContext {
logical_schema: SchemaRef,
physical_schema: SchemaRef,
logical_to_physical: ExpressionRef,
stats_columns: Vec<String>,
) -> Self {
WriteContext {
target_dir,
logical_schema,
physical_schema,
logical_to_physical,
stats_columns,
}
}

Expand All @@ -1346,6 +1399,13 @@ impl WriteContext {
self.logical_to_physical.clone()
}

/// Returns the column names that should have statistics collected during writes.
///
/// Based on table configuration (dataSkippingNumIndexedCols, dataSkippingStatsColumns).
pub fn stats_columns(&self) -> &[String] {
&self.stats_columns
}

/// Generate a new unique absolute URL for a deletion vector file.
///
/// This method generates a unique file name in the table directory.
Expand Down
Loading