Skip to content

Commit ba3b18f

Browse files
committed
api
1 parent d4ecc0a commit ba3b18f

File tree

3 files changed

+213
-56
lines changed

3 files changed

+213
-56
lines changed

kernel/src/scan/data_skipping/stats_schema.rs

Lines changed: 124 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,120 @@ pub(crate) fn expected_stats_schema(
133133
StructType::try_new(fields)
134134
}
135135

136+
/// Returns the list of column names that should have statistics collected.
137+
///
138+
/// This extracts just the column names without building the full stats schema,
139+
/// making it more efficient when only the column list is needed.
140+
#[allow(unused)]
141+
pub(crate) fn stats_column_names(
142+
physical_file_schema: &Schema,
143+
table_properties: &TableProperties,
144+
) -> Vec<ColumnName> {
145+
let mut filter = StatsColumnFilter::new(table_properties);
146+
let mut columns = Vec::new();
147+
filter.collect_columns(physical_file_schema, &mut columns);
148+
columns
149+
}
150+
151+
/// Handles column filtering logic for statistics based on table properties.
152+
///
153+
/// Filters columns according to:
154+
/// * `dataSkippingStatsColumns` - explicit list of columns to include (takes precedence)
155+
/// * `dataSkippingNumIndexedCols` - number of leaf columns to include (default 32)
156+
struct StatsColumnFilter {
157+
n_columns: Option<DataSkippingNumIndexedCols>,
158+
added_columns: u64,
159+
column_names: Option<Vec<ColumnName>>,
160+
path: Vec<String>,
161+
}
162+
163+
impl StatsColumnFilter {
164+
fn new(props: &TableProperties) -> Self {
165+
// If data_skipping_stats_columns is specified, it takes precedence
166+
// over data_skipping_num_indexed_cols, even if that is also specified.
167+
if let Some(column_names) = &props.data_skipping_stats_columns {
168+
Self {
169+
n_columns: None,
170+
added_columns: 0,
171+
column_names: Some(column_names.clone()),
172+
path: Vec::new(),
173+
}
174+
} else {
175+
let n_cols = props
176+
.data_skipping_num_indexed_cols
177+
.unwrap_or(DataSkippingNumIndexedCols::NumColumns(32));
178+
Self {
179+
n_columns: Some(n_cols),
180+
added_columns: 0,
181+
column_names: None,
182+
path: Vec::new(),
183+
}
184+
}
185+
}
186+
187+
/// Collects column names that should have statistics.
188+
fn collect_columns(&mut self, schema: &Schema, result: &mut Vec<ColumnName>) {
189+
for field in schema.fields() {
190+
self.collect_field(field, result);
191+
}
192+
}
193+
194+
fn collect_field(&mut self, field: &StructField, result: &mut Vec<ColumnName>) {
195+
if self.at_column_limit() {
196+
return;
197+
}
198+
199+
self.path.push(field.name.clone());
200+
201+
match field.data_type() {
202+
DataType::Struct(struct_type) => {
203+
for child in struct_type.fields() {
204+
self.collect_field(child, result);
205+
}
206+
}
207+
_ => {
208+
if self.should_include_current() {
209+
result.push(ColumnName::new(&self.path));
210+
self.added_columns += 1;
211+
}
212+
}
213+
}
214+
215+
self.path.pop();
216+
}
217+
218+
/// Returns true if the column limit has been reached.
219+
fn at_column_limit(&self) -> bool {
220+
matches!(
221+
self.n_columns,
222+
Some(DataSkippingNumIndexedCols::NumColumns(n)) if self.added_columns >= n
223+
)
224+
}
225+
226+
/// Returns true if the current path should be included based on column_names config.
227+
fn should_include_current(&self) -> bool {
228+
self.column_names
229+
.as_ref()
230+
.map(|ns| should_include_column(&ColumnName::new(&self.path), ns))
231+
.unwrap_or(true)
232+
}
233+
234+
/// Enters a field path for filtering decisions.
235+
fn enter_field(&mut self, name: &str) {
236+
self.path.push(name.to_string());
237+
}
238+
239+
/// Exits the current field path.
240+
fn exit_field(&mut self) {
241+
self.path.pop();
242+
}
243+
244+
/// Records that a leaf column was included.
245+
fn record_included(&mut self) {
246+
self.added_columns += 1;
247+
}
248+
}
249+
136250
/// Transforms a schema to make all fields nullable.
137251
/// Used for stats schemas where stats may not be available for all columns.
138252
pub(crate) struct NullableStatsTransform;
@@ -182,40 +296,17 @@ impl<'a> SchemaTransform<'a> for NullCountStatsTransform {
182296
/// The concrete shape of the schema depends on the table configuration.
183297
/// * `dataSkippingStatsColumns` - used to explicitly specify the columns
184298
/// to be used for data skipping statistics. (takes precedence)
185-
/// * `dataSkippingNumIndexedCols` - used to specify the number of columns
186-
/// to be used for data skipping statistics. Defaults to 32.
187-
///
188299
/// All fields are nullable.
189300
#[allow(unused)]
190301
struct BaseStatsTransform {
191-
n_columns: Option<DataSkippingNumIndexedCols>,
192-
added_columns: u64,
193-
column_names: Option<Vec<ColumnName>>,
194-
path: Vec<String>,
302+
filter: StatsColumnFilter,
195303
}
196304

197305
impl BaseStatsTransform {
198306
#[allow(unused)]
199307
fn new(props: &TableProperties) -> Self {
200-
// If data_skipping_stats_columns is specified, it takes precedence
201-
// over data_skipping_num_indexed_cols, even if that is also specified.
202-
if let Some(column_names) = &props.data_skipping_stats_columns {
203-
Self {
204-
n_columns: None,
205-
added_columns: 0,
206-
column_names: Some(column_names.clone()),
207-
path: Vec::new(),
208-
}
209-
} else {
210-
let n_cols = props
211-
.data_skipping_num_indexed_cols
212-
.unwrap_or(DataSkippingNumIndexedCols::NumColumns(32));
213-
Self {
214-
n_columns: Some(n_cols),
215-
added_columns: 0,
216-
column_names: None,
217-
path: Vec::new(),
218-
}
308+
Self {
309+
filter: StatsColumnFilter::new(props),
219310
}
220311
}
221312
}
@@ -224,34 +315,22 @@ impl<'a> SchemaTransform<'a> for BaseStatsTransform {
224315
fn transform_struct_field(&mut self, field: &'a StructField) -> Option<Cow<'a, StructField>> {
225316
use Cow::*;
226317

227-
// Check if the number of columns is set and if the added columns exceed the limit
228-
// In the constructor we assert this will always be None if column_names are specified
229-
if let Some(DataSkippingNumIndexedCols::NumColumns(n_cols)) = self.n_columns {
230-
if self.added_columns >= n_cols {
231-
return None;
232-
}
318+
if self.filter.at_column_limit() {
319+
return None;
233320
}
234321

235-
self.path.push(field.name.clone());
322+
self.filter.enter_field(field.name());
236323
let data_type = field.data_type();
237324

238325
// We always traverse struct fields (they don't count against the column limit),
239326
// but we only include leaf fields if they qualify based on column_names config.
240327
// When column_names is None, all leaf fields are included (up to n_columns limit).
241328
if !matches!(data_type, DataType::Struct(_)) {
242-
let should_include = self
243-
.column_names
244-
.as_ref()
245-
.map(|ns| should_include_column(&ColumnName::new(&self.path), ns))
246-
.unwrap_or(true);
247-
248-
if !should_include {
249-
self.path.pop();
329+
if !self.filter.should_include_current() {
330+
self.filter.exit_field();
250331
return None;
251332
}
252-
253-
// Increment count only for leaf columns
254-
self.added_columns += 1;
333+
self.filter.record_included();
255334
}
256335

257336
let field = match self.transform(&field.data_type)? {
@@ -264,7 +343,7 @@ impl<'a> SchemaTransform<'a> for BaseStatsTransform {
264343
}),
265344
};
266345

267-
self.path.pop();
346+
self.filter.exit_field();
268347

269348
// exclude struct fields with no children
270349
if matches!(field.data_type(), DataType::Struct(dt) if dt.fields().len() == 0) {

kernel/src/table_configuration.rs

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ use std::sync::Arc;
1313
use url::Url;
1414

1515
use crate::actions::{Metadata, Protocol};
16-
use crate::scan::data_skipping::stats_schema::expected_stats_schema;
16+
use crate::expressions::ColumnName;
17+
use crate::scan::data_skipping::stats_schema::{expected_stats_schema, stats_column_names};
1718
use crate::schema::variant_utils::validate_variant_type_feature_support;
1819
use crate::schema::{InvariantChecker, SchemaRef, StructType};
1920
use crate::table_features::{
@@ -181,21 +182,38 @@ impl TableConfiguration {
181182
#[allow(unused)]
182183
#[internal_api]
183184
pub(crate) fn expected_stats_schema(&self) -> DeltaResult<SchemaRef> {
185+
let physical_schema = self.physical_data_schema();
186+
Ok(Arc::new(expected_stats_schema(
187+
&physical_schema,
188+
self.table_properties(),
189+
)?))
190+
}
191+
192+
/// Returns the list of column names that should have statistics collected.
193+
///
194+
/// This returns the leaf column paths as a flat list of column names
195+
/// (e.g., `["id", "nested.field"]`).
196+
#[allow(unused)]
197+
#[internal_api]
198+
pub(crate) fn stats_column_names(&self) -> Vec<ColumnName> {
199+
let physical_schema = self.physical_data_schema();
200+
stats_column_names(&physical_schema, self.table_properties())
201+
}
202+
203+
/// Returns the physical schema for data columns (excludes partition columns).
204+
///
205+
/// Partition columns are excluded because statistics are only collected for data columns
206+
/// that are physically stored in the parquet files. Partition values are stored in the
207+
/// file path, not in the file content, so they don't have file-level statistics.
208+
fn physical_data_schema(&self) -> StructType {
184209
let partition_columns = self.metadata().partition_columns();
185210
let column_mapping_mode = self.column_mapping_mode();
186-
// Partition columns are excluded because statistics are only collected for data columns
187-
// that are physically stored in the parquet files. Partition values are stored in the
188-
// file path, not in the file content, so they don't have file-level statistics.
189-
let physical_schema = StructType::try_new(
211+
StructType::new_unchecked(
190212
self.schema()
191213
.fields()
192214
.filter(|field| !partition_columns.contains(field.name()))
193215
.map(|field| field.make_physical(column_mapping_mode)),
194-
)?;
195-
Ok(Arc::new(expected_stats_schema(
196-
&physical_schema,
197-
self.table_properties(),
198-
)?))
216+
)
199217
}
200218

201219
/// The [`Metadata`] for this table at this version.

kernel/src/transaction/mod.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,45 @@ impl Transaction {
790790
&BASE_ADD_FILES_SCHEMA
791791
}
792792

793+
/// Returns the expected schema for file statistics.
794+
///
795+
/// The schema structure is derived from table configuration:
796+
/// - `delta.dataSkippingStatsColumns`: Explicit column list (if set)
797+
/// - `delta.dataSkippingNumIndexedCols`: Column count limit (default 32)
798+
/// - Partition columns: Always excluded
799+
///
800+
/// The returned schema has the following structure:
801+
/// ```ignore
802+
/// {
803+
/// numRecords: long,
804+
/// nullCount: { ... }, // Nested struct mirroring data schema, all fields LONG
805+
/// minValues: { ... }, // Nested struct, only min/max eligible types
806+
/// maxValues: { ... }, // Nested struct, only min/max eligible types
807+
/// tightBounds: boolean,
808+
/// }
809+
/// ```
810+
///
811+
/// Engines should collect statistics matching this schema structure when writing files.
812+
#[allow(unused)]
813+
pub fn stats_schema(&self) -> DeltaResult<SchemaRef> {
814+
self.read_snapshot
815+
.table_configuration()
816+
.expected_stats_schema()
817+
}
818+
819+
/// Returns the list of column names that should have statistics collected.
820+
///
821+
/// This returns the leaf column paths as a flat list of column names
822+
/// (e.g., `["id", "nested.field"]`).
823+
///
824+
/// Engines can use this to determine which columns need stats during writes.
825+
#[allow(unused)]
826+
pub fn stats_columns(&self) -> Vec<ColumnName> {
827+
self.read_snapshot
828+
.table_configuration()
829+
.stats_column_names()
830+
}
831+
793832
// Generate the logical-to-physical transform expression which must be evaluated on every data
794833
// chunk before writing. At the moment, this is a transaction-wide expression.
795834
fn generate_logical_to_physical(&self) -> Expression {
@@ -820,6 +859,8 @@ impl Transaction {
820859
// Note: after we introduce metadata updates (modify table schema, etc.), we need to make sure
821860
// that engines cannot call this method after a metadata change, since the write context could
822861
// have invalid metadata.
862+
// Note: Callers that use get_write_context may be writing data to the table and they might
863+
// have invalid metadata.
823864
pub fn get_write_context(&self) -> WriteContext {
824865
let target_dir = self.read_snapshot.table_root();
825866
let snapshot_schema = self.read_snapshot.schema();
@@ -837,11 +878,19 @@ impl Transaction {
837878
.cloned();
838879
let physical_schema = Arc::new(StructType::new_unchecked(physical_fields));
839880

881+
// Get stats columns from table configuration
882+
let stats_columns = self
883+
.stats_columns()
884+
.into_iter()
885+
.map(|c| c.to_string())
886+
.collect();
887+
840888
WriteContext::new(
841889
target_dir.clone(),
842890
snapshot_schema,
843891
physical_schema,
844892
Arc::new(logical_to_physical),
893+
stats_columns,
845894
)
846895
}
847896

@@ -1313,6 +1362,8 @@ pub struct WriteContext {
13131362
logical_schema: SchemaRef,
13141363
physical_schema: SchemaRef,
13151364
logical_to_physical: ExpressionRef,
1365+
/// Column names that should have statistics collected during writes.
1366+
stats_columns: Vec<String>,
13161367
}
13171368

13181369
impl WriteContext {
@@ -1321,12 +1372,14 @@ impl WriteContext {
13211372
logical_schema: SchemaRef,
13221373
physical_schema: SchemaRef,
13231374
logical_to_physical: ExpressionRef,
1375+
stats_columns: Vec<String>,
13241376
) -> Self {
13251377
WriteContext {
13261378
target_dir,
13271379
logical_schema,
13281380
physical_schema,
13291381
logical_to_physical,
1382+
stats_columns,
13301383
}
13311384
}
13321385

@@ -1346,6 +1399,13 @@ impl WriteContext {
13461399
self.logical_to_physical.clone()
13471400
}
13481401

1402+
/// Returns the column names that should have statistics collected during writes.
1403+
///
1404+
/// Based on table configuration (dataSkippingNumIndexedCols, dataSkippingStatsColumns).
1405+
pub fn stats_columns(&self) -> &[String] {
1406+
&self.stats_columns
1407+
}
1408+
13491409
/// Generate a new unique absolute URL for a deletion vector file.
13501410
///
13511411
/// This method generates a unique file name in the table directory.
@@ -1362,7 +1422,7 @@ impl WriteContext {
13621422
/// # Examples
13631423
///
13641424
/// ```rust,ignore
1365-
/// let write_context = transaction.get_write_context();
1425+
/// let write_context = transaction.get_write_context(engine)?;
13661426
/// let dv_path = write_context.new_deletion_vector_path(String::from(rand_string()));
13671427
/// // dv_url might be: s3://bucket/table/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin
13681428
/// ```

0 commit comments

Comments
 (0)