Skip to content

Commit aca26f8

Browse files
committed
feat: add stats_columns to ParquetHandler + Transaction stats API
- Add stats_columns parameter to write_parquet_file trait - Add stats_schema(), stats_columns(), get_clustering_columns() to Transaction - Add stats_columns to WriteContext - Update get_write_context() to take engine parameter - Add clustering column support to expected_stats_schema()
1 parent d4ecc0a commit aca26f8

File tree

11 files changed

+350
-58
lines changed

11 files changed

+350
-58
lines changed

ffi/src/transaction/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,9 @@ mod tests {
288288
))
289289
};
290290

291-
let write_context = unsafe { get_write_context(txn_with_engine_info.shallow_copy()) };
291+
let write_context = ok_or_panic(unsafe {
292+
get_write_context(txn_with_engine_info.shallow_copy(), engine.shallow_copy())
293+
});
292294

293295
// Ensure we get the correct schema
294296
let write_schema = unsafe { get_write_schema(write_context.shallow_copy()) };

ffi/src/transaction/write_context.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
use crate::error::{ExternResult, IntoExternResult};
12
use crate::handle::Handle;
2-
use crate::{kernel_string_slice, AllocateStringFn, NullableCvoid, SharedSchema};
3+
use crate::{
4+
kernel_string_slice, AllocateStringFn, DeltaResult, ExternEngine, NullableCvoid,
5+
SharedExternEngine, SharedSchema,
6+
};
37
use delta_kernel::transaction::WriteContext;
48
use delta_kernel_ffi_macros::handle_descriptor;
59

@@ -19,13 +23,24 @@ pub struct SharedWriteContext;
1923
///
2024
/// # Safety
2125
///
22-
/// Caller is responsible for passing a [valid][Handle#Validity] transaction handle.
26+
/// Caller is responsible for passing a [valid][Handle#Validity] transaction handle and engine handle.
2327
#[no_mangle]
2428
pub unsafe extern "C" fn get_write_context(
2529
txn: Handle<ExclusiveTransaction>,
26-
) -> Handle<SharedWriteContext> {
30+
engine: Handle<SharedExternEngine>,
31+
) -> ExternResult<Handle<SharedWriteContext>> {
2732
let txn = unsafe { txn.as_ref() };
28-
Arc::new(txn.get_write_context()).into()
33+
let extern_engine = unsafe { engine.as_ref() };
34+
get_write_context_impl(txn, extern_engine).into_extern_result(&extern_engine)
35+
}
36+
37+
fn get_write_context_impl(
38+
txn: &delta_kernel::transaction::Transaction,
39+
extern_engine: &dyn ExternEngine,
40+
) -> DeltaResult<Handle<SharedWriteContext>> {
41+
let engine = extern_engine.engine();
42+
let write_context = txn.get_write_context(engine.as_ref())?;
43+
Ok(Arc::new(write_context).into())
2944
}
3045

3146
#[no_mangle]

kernel/examples/write-table/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ async fn try_main() -> DeltaResult<()> {
9494
.with_data_change(true);
9595

9696
// Write the data using the engine
97-
let write_context = Arc::new(txn.get_write_context());
97+
let write_context = Arc::new(txn.get_write_context(&engine)?);
9898
let file_metadata = engine
9999
.write_parquet(&sample_data, write_context.as_ref(), HashMap::new())
100100
.await?;

kernel/src/scan/data_skipping/stats_schema.rs

Lines changed: 160 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -93,18 +93,29 @@ use crate::{
9393
/// tightBounds: boolean,
9494
/// }
9595
/// ```
96+
/// Generates the expected schema for file statistics.
97+
///
98+
/// # Arguments
99+
///
100+
/// * `physical_file_schema` - The physical schema of the data files (no partition columns)
101+
/// * `table_properties` - Table properties containing stats configuration
102+
/// * `clustering_columns` - List of clustering columns that must always be included
103+
/// in statistics regardless of `dataSkippingNumIndexedCols` limits. Pass an empty slice
104+
/// if there are no clustering columns.
96105
#[allow(unused)]
97106
pub(crate) fn expected_stats_schema(
98107
physical_file_schema: &Schema,
99108
table_properties: &TableProperties,
109+
clustering_columns: &[ColumnName],
100110
) -> DeltaResult<Schema> {
101111
let mut fields = Vec::with_capacity(5);
102112
fields.push(StructField::nullable("numRecords", DataType::LONG));
103113

104114
// generate the base stats schema:
105115
// - make all fields nullable
106116
// - include fields according to table properties (num_indexed_cols, stats_columns, ...)
107-
let mut base_transform = BaseStatsTransform::new(table_properties);
117+
// - always include clustering columns (they don't count against the column limit)
118+
let mut base_transform = BaseStatsTransform::new(table_properties, clustering_columns);
108119
if let Some(base_schema) = base_transform.transform_struct(physical_file_schema) {
109120
let base_schema = base_schema.into_owned();
110121

@@ -184,26 +195,30 @@ impl<'a> SchemaTransform<'a> for NullCountStatsTransform {
184195
/// to be used for data skipping statistics. (takes precedence)
185196
/// * `dataSkippingNumIndexedCols` - used to specify the number of columns
186197
/// to be used for data skipping statistics. Defaults to 32.
198+
/// * Clustering columns are always included regardless of the above limits.
187199
///
188200
/// All fields are nullable.
189201
#[allow(unused)]
190-
struct BaseStatsTransform {
202+
struct BaseStatsTransform<'a> {
191203
n_columns: Option<DataSkippingNumIndexedCols>,
192204
added_columns: u64,
193205
column_names: Option<Vec<ColumnName>>,
206+
/// Clustering columns that must always be included (don't count against limit)
207+
clustering_columns: &'a [ColumnName],
194208
path: Vec<String>,
195209
}
196210

197-
impl BaseStatsTransform {
211+
impl<'a> BaseStatsTransform<'a> {
198212
#[allow(unused)]
199-
fn new(props: &TableProperties) -> Self {
213+
fn new(props: &TableProperties, clustering_columns: &'a [ColumnName]) -> Self {
200214
// If data_skipping_stats_columns is specified, it takes precedence
201215
// over data_skipping_num_indexed_cols, even if that is also specified.
202216
if let Some(column_names) = &props.data_skipping_stats_columns {
203217
Self {
204218
n_columns: None,
205219
added_columns: 0,
206220
column_names: Some(column_names.clone()),
221+
clustering_columns,
207222
path: Vec::new(),
208223
}
209224
} else {
@@ -214,44 +229,63 @@ impl BaseStatsTransform {
214229
n_columns: Some(n_cols),
215230
added_columns: 0,
216231
column_names: None,
232+
clustering_columns,
217233
path: Vec::new(),
218234
}
219235
}
220236
}
237+
238+
/// Checks if the current path matches a clustering column.
239+
/// Clustering columns are always included regardless of column limits.
240+
fn is_clustering_column(&self) -> bool {
241+
let current = ColumnName::new(&self.path);
242+
self.clustering_columns.iter().any(|c| c == &current)
243+
}
221244
}
222245

223-
impl<'a> SchemaTransform<'a> for BaseStatsTransform {
246+
impl<'a, 'b> SchemaTransform<'a> for BaseStatsTransform<'b> {
224247
fn transform_struct_field(&mut self, field: &'a StructField) -> Option<Cow<'a, StructField>> {
225248
use Cow::*;
226249

250+
self.path.push(field.name.clone());
251+
let data_type = field.data_type();
252+
253+
// Clustering columns are always included, regardless of limits
254+
let is_clustering = self.is_clustering_column();
255+
227256
// Check if the number of columns is set and if the added columns exceed the limit
228-
// In the constructor we assert this will always be None if column_names are specified
229-
if let Some(DataSkippingNumIndexedCols::NumColumns(n_cols)) = self.n_columns {
230-
if self.added_columns >= n_cols {
231-
return None;
257+
// Clustering columns bypass this limit
258+
if !is_clustering {
259+
if let Some(DataSkippingNumIndexedCols::NumColumns(n_cols)) = self.n_columns {
260+
if self.added_columns >= n_cols {
261+
self.path.pop();
262+
return None;
263+
}
232264
}
233265
}
234266

235-
self.path.push(field.name.clone());
236-
let data_type = field.data_type();
237-
238267
// We always traverse struct fields (they don't count against the column limit),
239268
// but we only include leaf fields if they qualify based on column_names config.
240269
// When column_names is None, all leaf fields are included (up to n_columns limit).
270+
// Clustering columns are always included regardless of column_names config.
241271
if !matches!(data_type, DataType::Struct(_)) {
242-
let should_include = self
243-
.column_names
244-
.as_ref()
245-
.map(|ns| should_include_column(&ColumnName::new(&self.path), ns))
246-
.unwrap_or(true);
272+
let should_include = is_clustering
273+
|| self
274+
.column_names
275+
.as_ref()
276+
.map(|ns| should_include_column(&ColumnName::new(&self.path), ns))
277+
.unwrap_or(true);
247278

248279
if !should_include {
249280
self.path.pop();
250281
return None;
251282
}
252283

253-
// Increment count only for leaf columns
254-
self.added_columns += 1;
284+
// Increment count only for non-clustering leaf columns
285+
// Clustering columns don't count against the limit
286+
if !is_clustering {
287+
self.added_columns += 1;
288+
}
255289
}
256290

257291
let field = match self.transform(&field.data_type)? {
@@ -363,7 +397,7 @@ mod tests {
363397
let properties: TableProperties = [("key", "value")].into();
364398
let file_schema = StructType::new_unchecked([StructField::nullable("id", DataType::LONG)]);
365399

366-
let stats_schema = expected_stats_schema(&file_schema, &properties).unwrap();
400+
let stats_schema = expected_stats_schema(&file_schema, &properties, &[]).unwrap();
367401
let expected = StructType::new_unchecked([
368402
StructField::nullable("numRecords", DataType::LONG),
369403
StructField::nullable("nullCount", file_schema.clone()),
@@ -387,7 +421,7 @@ mod tests {
387421
StructField::not_null("id", DataType::LONG),
388422
StructField::not_null("user", DataType::Struct(Box::new(user_struct.clone()))),
389423
]);
390-
let stats_schema = expected_stats_schema(&file_schema, &properties).unwrap();
424+
let stats_schema = expected_stats_schema(&file_schema, &properties, &[]).unwrap();
391425

392426
// Expected result: The stats schema should maintain the nested structure
393427
// but make all fields nullable
@@ -437,7 +471,7 @@ mod tests {
437471
),
438472
]);
439473

440-
let stats_schema = expected_stats_schema(&file_schema, &properties).unwrap();
474+
let stats_schema = expected_stats_schema(&file_schema, &properties, &[]).unwrap();
441475

442476
let expected_null_nested = StructType::new_unchecked([
443477
StructField::nullable("name", DataType::LONG),
@@ -486,7 +520,7 @@ mod tests {
486520
StructField::nullable("user.info", DataType::Struct(Box::new(user_struct.clone()))),
487521
]);
488522

489-
let stats_schema = expected_stats_schema(&file_schema, &properties).unwrap();
523+
let stats_schema = expected_stats_schema(&file_schema, &properties, &[]).unwrap();
490524

491525
let expected_nested =
492526
StructType::new_unchecked([StructField::nullable("name", DataType::STRING)]);
@@ -523,7 +557,7 @@ mod tests {
523557
StructField::nullable("age", DataType::INTEGER),
524558
]);
525559

526-
let stats_schema = expected_stats_schema(&logical_schema, &properties).unwrap();
560+
let stats_schema = expected_stats_schema(&logical_schema, &properties, &[]).unwrap();
527561

528562
let expected_fields =
529563
StructType::new_unchecked([StructField::nullable("name", DataType::STRING)]);
@@ -557,7 +591,7 @@ mod tests {
557591
StructField::nullable("metadata", DataType::BINARY),
558592
]);
559593

560-
let stats_schema = expected_stats_schema(&file_schema, &properties).unwrap();
594+
let stats_schema = expected_stats_schema(&file_schema, &properties, &[]).unwrap();
561595

562596
// Expected nullCount schema: all fields converted to LONG
563597
let expected_null_count = StructType::new_unchecked([
@@ -599,7 +633,7 @@ mod tests {
599633
StructField::nullable("is_deleted", DataType::BOOLEAN), // NOT eligible for min/max
600634
]);
601635

602-
let stats_schema = expected_stats_schema(&file_schema, &properties).unwrap();
636+
let stats_schema = expected_stats_schema(&file_schema, &properties, &[]).unwrap();
603637

604638
// Expected nullCount schema: all fields converted to LONG, maintaining structure
605639
let expected_null_user = StructType::new_unchecked([
@@ -649,7 +683,7 @@ mod tests {
649683
),
650684
]);
651685

652-
let stats_schema = expected_stats_schema(&file_schema, &properties).unwrap();
686+
let stats_schema = expected_stats_schema(&file_schema, &properties, &[]).unwrap();
653687

654688
// Expected nullCount schema: all fields converted to LONG
655689
let expected_null_count = StructType::new_unchecked([
@@ -669,4 +703,103 @@ mod tests {
669703

670704
assert_eq!(&expected, &stats_schema);
671705
}
706+
707+
#[test]
708+
fn test_stats_schema_clustering_columns_bypass_limit() {
709+
// Set limit to 1 column, but clustering columns should still be included
710+
let properties: TableProperties = [(
711+
"delta.dataSkippingNumIndexedCols".to_string(),
712+
"1".to_string(),
713+
)]
714+
.into();
715+
716+
let file_schema = StructType::new_unchecked([
717+
StructField::nullable("id", DataType::LONG),
718+
StructField::nullable("name", DataType::STRING),
719+
StructField::nullable("cluster_col", DataType::INTEGER),
720+
]);
721+
722+
// Without clustering columns: only first column (id) should be included
723+
let stats_schema_no_cluster =
724+
expected_stats_schema(&file_schema, &properties, &[]).unwrap();
725+
726+
let expected_no_cluster =
727+
StructType::new_unchecked([StructField::nullable("id", DataType::LONG)]);
728+
let null_count_no_cluster = NullCountStatsTransform
729+
.transform_struct(&expected_no_cluster)
730+
.unwrap()
731+
.into_owned();
732+
733+
let expected_no_cluster = StructType::new_unchecked([
734+
StructField::nullable("numRecords", DataType::LONG),
735+
StructField::nullable("nullCount", null_count_no_cluster),
736+
StructField::nullable("minValues", expected_no_cluster.clone()),
737+
StructField::nullable("maxValues", expected_no_cluster),
738+
StructField::nullable("tightBounds", DataType::BOOLEAN),
739+
]);
740+
assert_eq!(&expected_no_cluster, &stats_schema_no_cluster);
741+
742+
// With clustering columns: id (from limit) + cluster_col (clustering) should be included
743+
let clustering_cols = vec![ColumnName::new(["cluster_col"])];
744+
let stats_schema_with_cluster =
745+
expected_stats_schema(&file_schema, &properties, &clustering_cols).unwrap();
746+
747+
let expected_with_cluster = StructType::new_unchecked([
748+
StructField::nullable("id", DataType::LONG),
749+
StructField::nullable("cluster_col", DataType::INTEGER),
750+
]);
751+
let null_count_with_cluster = NullCountStatsTransform
752+
.transform_struct(&expected_with_cluster)
753+
.unwrap()
754+
.into_owned();
755+
756+
let expected_with_cluster = StructType::new_unchecked([
757+
StructField::nullable("numRecords", DataType::LONG),
758+
StructField::nullable("nullCount", null_count_with_cluster),
759+
StructField::nullable("minValues", expected_with_cluster.clone()),
760+
StructField::nullable("maxValues", expected_with_cluster),
761+
StructField::nullable("tightBounds", DataType::BOOLEAN),
762+
]);
763+
assert_eq!(&expected_with_cluster, &stats_schema_with_cluster);
764+
}
765+
766+
#[test]
767+
fn test_stats_schema_clustering_columns_with_stats_columns() {
768+
// When dataSkippingStatsColumns is set, clustering columns should still be included
769+
let properties: TableProperties = [(
770+
"delta.dataSkippingStatsColumns".to_string(),
771+
"id".to_string(),
772+
)]
773+
.into();
774+
775+
let file_schema = StructType::new_unchecked([
776+
StructField::nullable("id", DataType::LONG),
777+
StructField::nullable("name", DataType::STRING),
778+
StructField::nullable("cluster_col", DataType::INTEGER),
779+
]);
780+
781+
// With dataSkippingStatsColumns=id and clustering_columns=[cluster_col],
782+
// both should be included
783+
let clustering_cols = vec![ColumnName::new(["cluster_col"])];
784+
let stats_schema =
785+
expected_stats_schema(&file_schema, &properties, &clustering_cols).unwrap();
786+
787+
let expected_fields = StructType::new_unchecked([
788+
StructField::nullable("id", DataType::LONG),
789+
StructField::nullable("cluster_col", DataType::INTEGER),
790+
]);
791+
let null_count = NullCountStatsTransform
792+
.transform_struct(&expected_fields)
793+
.unwrap()
794+
.into_owned();
795+
796+
let expected = StructType::new_unchecked([
797+
StructField::nullable("numRecords", DataType::LONG),
798+
StructField::nullable("nullCount", null_count),
799+
StructField::nullable("minValues", expected_fields.clone()),
800+
StructField::nullable("maxValues", expected_fields),
801+
StructField::nullable("tightBounds", DataType::BOOLEAN),
802+
]);
803+
assert_eq!(&expected, &stats_schema);
804+
}
672805
}

0 commit comments

Comments
 (0)