Skip to content

Commit 82a99b2

Browse files
committed
feat: add stats_columns to ParquetHandler + Transaction stats API
- Add stats_columns parameter to write_parquet_file trait - Add stats_schema(), stats_columns(), get_clustering_columns() to Transaction - Add stats_columns to WriteContext - Update get_write_context() to take engine parameter - Add clustering column support to expected_stats_schema()
1 parent d4ecc0a commit 82a99b2

File tree

12 files changed

+366
-69
lines changed

12 files changed

+366
-69
lines changed

kernel/examples/write-table/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ async fn try_main() -> DeltaResult<()> {
9494
.with_data_change(true);
9595

9696
// Write the data using the engine
97-
let write_context = Arc::new(txn.get_write_context());
97+
let write_context = Arc::new(txn.get_write_context(&engine)?);
9898
let file_metadata = engine
9999
.write_parquet(&sample_data, write_context.as_ref(), HashMap::new())
100100
.await?;

kernel/src/engine/default/parquet.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
302302
&self,
303303
location: url::Url,
304304
mut data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>,
305+
_stats_columns: &[String],
305306
) -> DeltaResult<()> {
306307
let store = self.store.clone();
307308

@@ -776,7 +777,7 @@ mod tests {
776777
// Test writing through the trait method
777778
let file_url = Url::parse("memory:///test/data.parquet").unwrap();
778779
parquet_handler
779-
.write_parquet_file(file_url.clone(), data_iter)
780+
.write_parquet_file(file_url.clone(), data_iter, &[])
780781
.unwrap();
781782

782783
// Verify we can read the file back
@@ -964,7 +965,7 @@ mod tests {
964965
// Write the data
965966
let file_url = Url::parse("memory:///roundtrip/test.parquet").unwrap();
966967
parquet_handler
967-
.write_parquet_file(file_url.clone(), data_iter)
968+
.write_parquet_file(file_url.clone(), data_iter, &[])
968969
.unwrap();
969970

970971
// Read it back
@@ -1152,7 +1153,7 @@ mod tests {
11521153

11531154
// Write the first file
11541155
parquet_handler
1155-
.write_parquet_file(file_url.clone(), data_iter1)
1156+
.write_parquet_file(file_url.clone(), data_iter1, &[])
11561157
.unwrap();
11571158

11581159
// Create second data set with different data
@@ -1168,7 +1169,7 @@ mod tests {
11681169

11691170
// Overwrite with second file (overwrite=true)
11701171
parquet_handler
1171-
.write_parquet_file(file_url.clone(), data_iter2)
1172+
.write_parquet_file(file_url.clone(), data_iter2, &[])
11721173
.unwrap();
11731174

11741175
// Read back and verify it contains the second data set
@@ -1231,7 +1232,7 @@ mod tests {
12311232

12321233
// Write the first file
12331234
parquet_handler
1234-
.write_parquet_file(file_url.clone(), data_iter1)
1235+
.write_parquet_file(file_url.clone(), data_iter1, &[])
12351236
.unwrap();
12361237

12371238
// Create second data set
@@ -1247,7 +1248,7 @@ mod tests {
12471248

12481249
// Write again - should overwrite successfully (new behavior always overwrites)
12491250
parquet_handler
1250-
.write_parquet_file(file_url.clone(), data_iter2)
1251+
.write_parquet_file(file_url.clone(), data_iter2, &[])
12511252
.unwrap();
12521253

12531254
// Verify the file was overwritten with the new data

kernel/src/engine/sync/parquet.rs

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ impl ParquetHandler for SyncParquetHandler {
8787
&self,
8888
location: Url,
8989
mut data: Box<dyn Iterator<Item = DeltaResult<Box<dyn crate::EngineData>>> + Send>,
90+
_stats_columns: &[String],
9091
) -> DeltaResult<()> {
9192
// Convert URL to file path
9293
let path = location
@@ -115,6 +116,7 @@ impl ParquetHandler for SyncParquetHandler {
115116

116117
writer.close()?; // writer must be closed to write footer
117118

119+
// TODO: Implement stats collection for SyncEngine
118120
Ok(())
119121
}
120122

@@ -174,7 +176,9 @@ mod tests {
174176
> = Box::new(std::iter::once(Ok(engine_data)));
175177

176178
// Write the file
177-
handler.write_parquet_file(url.clone(), data_iter).unwrap();
179+
handler
180+
.write_parquet_file(url.clone(), data_iter, &[])
181+
.unwrap();
178182

179183
// Verify the file exists
180184
assert!(file_path.exists());
@@ -295,7 +299,9 @@ mod tests {
295299
> = Box::new(std::iter::once(Ok(engine_data)));
296300

297301
// Write the file
298-
handler.write_parquet_file(url.clone(), data_iter).unwrap();
302+
handler
303+
.write_parquet_file(url.clone(), data_iter, &[])
304+
.unwrap();
299305

300306
// Verify the file exists
301307
assert!(file_path.exists());
@@ -370,7 +376,9 @@ mod tests {
370376
> = Box::new(std::iter::once(Ok(engine_data1)));
371377

372378
// Write the first file
373-
handler.write_parquet_file(url.clone(), data_iter1).unwrap();
379+
handler
380+
.write_parquet_file(url.clone(), data_iter1, &[])
381+
.unwrap();
374382
assert!(file_path.exists());
375383

376384
// Create second data set with different data
@@ -386,7 +394,9 @@ mod tests {
386394
> = Box::new(std::iter::once(Ok(engine_data2)));
387395

388396
// Overwrite with second file (overwrite=true)
389-
handler.write_parquet_file(url.clone(), data_iter2).unwrap();
397+
handler
398+
.write_parquet_file(url.clone(), data_iter2, &[])
399+
.unwrap();
390400

391401
// Read back and verify it contains the second data set
392402
let file = File::open(&file_path).unwrap();
@@ -445,7 +455,9 @@ mod tests {
445455
> = Box::new(std::iter::once(Ok(engine_data1)));
446456

447457
// Write the first file
448-
handler.write_parquet_file(url.clone(), data_iter1).unwrap();
458+
handler
459+
.write_parquet_file(url.clone(), data_iter1, &[])
460+
.unwrap();
449461
assert!(file_path.exists());
450462

451463
// Create second data set
@@ -461,7 +473,9 @@ mod tests {
461473
> = Box::new(std::iter::once(Ok(engine_data2)));
462474

463475
// Write again - should overwrite successfully (new behavior always overwrites)
464-
handler.write_parquet_file(url.clone(), data_iter2).unwrap();
476+
handler
477+
.write_parquet_file(url.clone(), data_iter2, &[])
478+
.unwrap();
465479

466480
// Verify the file was overwritten with the new data
467481
let file = File::open(&file_path).unwrap();
@@ -537,7 +551,9 @@ mod tests {
537551
> = Box::new(batches.into_iter());
538552

539553
// Write the file
540-
handler.write_parquet_file(url.clone(), data_iter).unwrap();
554+
handler
555+
.write_parquet_file(url.clone(), data_iter, &[])
556+
.unwrap();
541557

542558
// Verify the file exists
543559
assert!(file_path.exists());

kernel/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -778,9 +778,10 @@ pub trait ParquetHandler: AsAny {
778778
predicate: Option<PredicateRef>,
779779
) -> DeltaResult<FileDataReadResultIterator>;
780780

781-
/// Write data to a Parquet file at the specified URL.
781+
/// Write data to a Parquet file at the specified URL, collecting statistics.
782782
///
783-
/// This method writes the provided `data` to a Parquet file at the given `url`.
783+
/// This method writes the provided `data` to a Parquet file at the given `url`,
784+
/// and collects statistics (min, max, null count) for the specified columns.
784785
///
785786
/// This will overwrite the file if it already exists.
786787
///
@@ -789,6 +790,7 @@ pub trait ParquetHandler: AsAny {
789790
/// - `url` - The full URL path where the Parquet file should be written
790791
/// (e.g., `s3://bucket/path/file.parquet`).
791792
/// - `data` - An iterator of engine data to be written to the Parquet file.
793+
/// - `stats_columns` - Column names for which statistics should be collected.
792794
///
793795
/// # Returns
794796
///
@@ -797,6 +799,7 @@ pub trait ParquetHandler: AsAny {
797799
&self,
798800
location: url::Url,
799801
data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>,
802+
stats_columns: &[String],
800803
) -> DeltaResult<()>;
801804

802805
/// Read the footer metadata from a Parquet file without reading the data.

0 commit comments

Comments
 (0)