Skip to content

Commit 4f68786

Browse files
committed
feat: add stats_columns to ParquetHandler + Transaction stats API
- Add stats_columns parameter to write_parquet_file trait - Add stats_schema(), stats_columns(), get_clustering_columns() to Transaction - Add stats_columns to WriteContext - Update get_write_context() to take engine parameter - Add clustering column support to expected_stats_schema()
1 parent 33e131d commit 4f68786

File tree

4 files changed

+42
-18
lines changed

4 files changed

+42
-18
lines changed

kernel/src/engine/default/parquet.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
294294
/// - `location` - The full URL path where the Parquet file should be written
295295
/// (e.g., `s3://bucket/path/file.parquet`, `file:///path/to/file.parquet`).
296296
/// - `data` - An iterator of engine data to be written to the Parquet file.
297+
/// - `stats_columns` - Column names for which statistics should be collected.
297298
///
298299
/// # Returns
299300
///
@@ -302,6 +303,7 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
302303
&self,
303304
location: url::Url,
304305
mut data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>,
306+
_stats_columns: &[String],
305307
) -> DeltaResult<()> {
306308
let store = self.store.clone();
307309

@@ -776,7 +778,7 @@ mod tests {
776778
// Test writing through the trait method
777779
let file_url = Url::parse("memory:///test/data.parquet").unwrap();
778780
parquet_handler
779-
.write_parquet_file(file_url.clone(), data_iter)
781+
.write_parquet_file(file_url.clone(), data_iter, &[])
780782
.unwrap();
781783

782784
// Verify we can read the file back
@@ -964,7 +966,7 @@ mod tests {
964966
// Write the data
965967
let file_url = Url::parse("memory:///roundtrip/test.parquet").unwrap();
966968
parquet_handler
967-
.write_parquet_file(file_url.clone(), data_iter)
969+
.write_parquet_file(file_url.clone(), data_iter, &[])
968970
.unwrap();
969971

970972
// Read it back
@@ -1152,7 +1154,7 @@ mod tests {
11521154

11531155
// Write the first file
11541156
parquet_handler
1155-
.write_parquet_file(file_url.clone(), data_iter1)
1157+
.write_parquet_file(file_url.clone(), data_iter1, &[])
11561158
.unwrap();
11571159

11581160
// Create second data set with different data
@@ -1168,7 +1170,7 @@ mod tests {
11681170

11691171
// Overwrite with second file (overwrite=true)
11701172
parquet_handler
1171-
.write_parquet_file(file_url.clone(), data_iter2)
1173+
.write_parquet_file(file_url.clone(), data_iter2, &[])
11721174
.unwrap();
11731175

11741176
// Read back and verify it contains the second data set
@@ -1231,7 +1233,7 @@ mod tests {
12311233

12321234
// Write the first file
12331235
parquet_handler
1234-
.write_parquet_file(file_url.clone(), data_iter1)
1236+
.write_parquet_file(file_url.clone(), data_iter1, &[])
12351237
.unwrap();
12361238

12371239
// Create second data set
@@ -1247,7 +1249,7 @@ mod tests {
12471249

12481250
// Write again - should overwrite successfully (new behavior always overwrites)
12491251
parquet_handler
1250-
.write_parquet_file(file_url.clone(), data_iter2)
1252+
.write_parquet_file(file_url.clone(), data_iter2, &[])
12511253
.unwrap();
12521254

12531255
// Verify the file was overwritten with the new data

kernel/src/engine/sync/parquet.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ impl ParquetHandler for SyncParquetHandler {
7979
/// - `location` - The full URL path where the Parquet file should be written
8080
/// (e.g., `file:///path/to/file.parquet`).
8181
/// - `data` - An iterator of engine data to be written to the Parquet file.
82+
/// - `stats_columns` - Column names for which statistics should be collected.
8283
///
8384
/// # Returns
8485
///
@@ -87,6 +88,7 @@ impl ParquetHandler for SyncParquetHandler {
8788
&self,
8889
location: Url,
8990
mut data: Box<dyn Iterator<Item = DeltaResult<Box<dyn crate::EngineData>>> + Send>,
91+
_stats_columns: &[String],
9092
) -> DeltaResult<()> {
9193
// Convert URL to file path
9294
let path = location
@@ -115,6 +117,7 @@ impl ParquetHandler for SyncParquetHandler {
115117

116118
writer.close()?; // writer must be closed to write footer
117119

120+
// TODO: Implement stats collection for SyncEngine
118121
Ok(())
119122
}
120123

@@ -174,7 +177,9 @@ mod tests {
174177
> = Box::new(std::iter::once(Ok(engine_data)));
175178

176179
// Write the file
177-
handler.write_parquet_file(url.clone(), data_iter).unwrap();
180+
handler
181+
.write_parquet_file(url.clone(), data_iter, &[])
182+
.unwrap();
178183

179184
// Verify the file exists
180185
assert!(file_path.exists());
@@ -295,7 +300,9 @@ mod tests {
295300
> = Box::new(std::iter::once(Ok(engine_data)));
296301

297302
// Write the file
298-
handler.write_parquet_file(url.clone(), data_iter).unwrap();
303+
handler
304+
.write_parquet_file(url.clone(), data_iter, &[])
305+
.unwrap();
299306

300307
// Verify the file exists
301308
assert!(file_path.exists());
@@ -370,7 +377,9 @@ mod tests {
370377
> = Box::new(std::iter::once(Ok(engine_data1)));
371378

372379
// Write the first file
373-
handler.write_parquet_file(url.clone(), data_iter1).unwrap();
380+
handler
381+
.write_parquet_file(url.clone(), data_iter1, &[])
382+
.unwrap();
374383
assert!(file_path.exists());
375384

376385
// Create second data set with different data
@@ -386,7 +395,9 @@ mod tests {
386395
> = Box::new(std::iter::once(Ok(engine_data2)));
387396

388397
// Overwrite with second file (overwrite=true)
389-
handler.write_parquet_file(url.clone(), data_iter2).unwrap();
398+
handler
399+
.write_parquet_file(url.clone(), data_iter2, &[])
400+
.unwrap();
390401

391402
// Read back and verify it contains the second data set
392403
let file = File::open(&file_path).unwrap();
@@ -445,7 +456,9 @@ mod tests {
445456
> = Box::new(std::iter::once(Ok(engine_data1)));
446457

447458
// Write the first file
448-
handler.write_parquet_file(url.clone(), data_iter1).unwrap();
459+
handler
460+
.write_parquet_file(url.clone(), data_iter1, &[])
461+
.unwrap();
449462
assert!(file_path.exists());
450463

451464
// Create second data set
@@ -461,7 +474,9 @@ mod tests {
461474
> = Box::new(std::iter::once(Ok(engine_data2)));
462475

463476
// Write again - should overwrite successfully (new behavior always overwrites)
464-
handler.write_parquet_file(url.clone(), data_iter2).unwrap();
477+
handler
478+
.write_parquet_file(url.clone(), data_iter2, &[])
479+
.unwrap();
465480

466481
// Verify the file was overwritten with the new data
467482
let file = File::open(&file_path).unwrap();
@@ -537,7 +552,9 @@ mod tests {
537552
> = Box::new(batches.into_iter());
538553

539554
// Write the file
540-
handler.write_parquet_file(url.clone(), data_iter).unwrap();
555+
handler
556+
.write_parquet_file(url.clone(), data_iter, &[])
557+
.unwrap();
541558

542559
// Verify the file exists
543560
assert!(file_path.exists());

kernel/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -778,9 +778,10 @@ pub trait ParquetHandler: AsAny {
778778
predicate: Option<PredicateRef>,
779779
) -> DeltaResult<FileDataReadResultIterator>;
780780

781-
/// Write data to a Parquet file at the specified URL.
781+
/// Write data to a Parquet file at the specified URL, collecting statistics.
782782
///
783-
/// This method writes the provided `data` to a Parquet file at the given `url`.
783+
/// This method writes the provided `data` to a Parquet file at the given `url`,
784+
/// and collects statistics (min, max, null count) for the specified columns.
784785
///
785786
/// This will overwrite the file if it already exists.
786787
///
@@ -789,6 +790,7 @@ pub trait ParquetHandler: AsAny {
789790
/// - `url` - The full URL path where the Parquet file should be written
790791
/// (e.g., `s3://bucket/path/file.parquet`).
791792
/// - `data` - An iterator of engine data to be written to the Parquet file.
793+
/// - `stats_columns` - Column names for which statistics should be collected.
792794
///
793795
/// # Returns
794796
///
@@ -797,6 +799,7 @@ pub trait ParquetHandler: AsAny {
797799
&self,
798800
location: url::Url,
799801
data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>,
802+
_stats_columns: &[String],
800803
) -> DeltaResult<()>;
801804

802805
/// Read the footer metadata from a Parquet file without reading the data.

kernel/src/snapshot.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -441,9 +441,11 @@ impl Snapshot {
441441
let data_iter = writer.checkpoint_data(engine)?;
442442
let state = data_iter.state();
443443
let lazy_data = data_iter.map(|r| r.and_then(|f| f.apply_selection_vector()));
444-
engine
445-
.parquet_handler()
446-
.write_parquet_file(checkpoint_path.clone(), Box::new(lazy_data))?;
444+
engine.parquet_handler().write_parquet_file(
445+
checkpoint_path.clone(),
446+
Box::new(lazy_data),
447+
&[],
448+
)?;
447449

448450
let file_meta = engine.storage_handler().head(&checkpoint_path)?;
449451

0 commit comments

Comments
 (0)