Skip to content

Commit ae3ea82

Browse files
committed
write-stats-all
1 parent 70722ec commit ae3ea82

File tree

13 files changed

+2418
-83
lines changed

13 files changed

+2418
-83
lines changed

kernel/examples/write-table/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,9 @@ async fn try_main() -> DeltaResult<()> {
9494
.with_data_change(true);
9595

9696
// Write the data using the engine
97-
let write_context = Arc::new(txn.get_write_context());
97+
let write_context = txn.get_write_context(&engine)?;
9898
let file_metadata = engine
99-
.write_parquet(&sample_data, write_context.as_ref(), HashMap::new())
99+
.write_parquet(&sample_data, &write_context, HashMap::new())
100100
.await?;
101101

102102
// Add the file metadata to the transaction

kernel/src/engine/default/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub mod file_stream;
3333
pub mod filesystem;
3434
pub mod json;
3535
pub mod parquet;
36+
pub(crate) mod stats;
3637
pub mod storage;
3738

3839
/// Converts a Stream-producing future to a synchronous iterator.
@@ -216,7 +217,12 @@ impl<E: TaskExecutor> DefaultEngine<E> {
216217
)?;
217218
let physical_data = logical_to_physical_expr.evaluate(data)?;
218219
self.parquet
219-
.write_parquet_file(write_context.target_dir(), physical_data, partition_values)
220+
.write_parquet_file(
221+
write_context.target_dir(),
222+
physical_data,
223+
partition_values,
224+
write_context.stats_columns(),
225+
)
220226
.await
221227
}
222228
}

kernel/src/engine/default/parquet.rs

Lines changed: 94 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use delta_kernel_derive::internal_api;
88

99
use crate::arrow::array::builder::{MapBuilder, MapFieldNames, StringBuilder};
1010
use crate::arrow::array::{Int64Array, RecordBatch, StringArray, StructArray};
11-
use crate::arrow::datatypes::{DataType, Field};
11+
use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
1212
use crate::parquet::arrow::arrow_reader::{
1313
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
1414
};
@@ -23,9 +23,10 @@ use object_store::{DynObjectStore, ObjectStore};
2323
use uuid::Uuid;
2424

2525
use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
26+
use super::stats::StatisticsCollector;
2627
use super::UrlExt;
2728
use crate::engine::arrow_conversion::{TryFromArrow as _, TryIntoArrow as _};
28-
use crate::engine::arrow_data::ArrowEngineData;
29+
use crate::engine::arrow_data::{extract_record_batch, ArrowEngineData};
2930
use crate::engine::arrow_utils::{
3031
fixup_parquet_read, generate_mask, get_requested_indices, ordering_needs_row_indexes,
3132
RowIndexBuilder,
@@ -47,23 +48,31 @@ pub struct DefaultParquetHandler<E: TaskExecutor> {
4748

4849
/// Metadata of a data file (typically a parquet file).
4950
///
50-
/// Currently just includes the the number of records as statistics, but will expand to include
51-
/// more statistics and other metadata in the future.
51+
/// Includes full file statistics collected during write.
5252
#[derive(Debug)]
5353
pub struct DataFileMetadata {
5454
file_meta: FileMeta,
5555
// NB: We use usize instead of u64 since arrow uses usize for record batch sizes
5656
num_records: usize,
57+
// Optional full statistics (numRecords, nullCount, minValues, maxValues, tightBounds)
58+
stats: Option<StructArray>,
5759
}
5860

5961
impl DataFileMetadata {
6062
pub fn new(file_meta: FileMeta, num_records: usize) -> Self {
6163
Self {
6264
file_meta,
6365
num_records,
66+
stats: None,
6467
}
6568
}
6669

70+
/// Attach full statistics to this metadata.
71+
pub fn with_stats(mut self, stats: StructArray) -> Self {
72+
self.stats = Some(stats);
73+
self
74+
}
75+
6776
/// Convert DataFileMetadata into a record batch which matches the schema returned by
6877
/// [`add_files_schema`].
6978
///
@@ -81,7 +90,9 @@ impl DataFileMetadata {
8190
size,
8291
},
8392
num_records,
93+
stats,
8494
} = self;
95+
8596
// create the record batch of the write metadata
8697
let path = Arc::new(StringArray::from(vec![location.to_string()]));
8798
let key_builder = StringBuilder::new();
@@ -104,20 +115,43 @@ impl DataFileMetadata {
104115
.map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?;
105116
let size = Arc::new(Int64Array::from(vec![size]));
106117
let modification_time = Arc::new(Int64Array::from(vec![*last_modified]));
107-
let stats = Arc::new(StructArray::try_new_with_length(
108-
vec![Field::new("numRecords", DataType::Int64, true)].into(),
109-
vec![Arc::new(Int64Array::from(vec![*num_records as i64]))],
110-
None,
111-
1,
112-
)?);
118+
119+
// Use full stats if available, otherwise minimal stats with just numRecords
120+
let stats_array: Arc<StructArray> = if let Some(full_stats) = stats {
121+
Arc::new(full_stats.clone())
122+
} else {
123+
Arc::new(StructArray::try_new_with_length(
124+
vec![Field::new("numRecords", DataType::Int64, true)].into(),
125+
vec![Arc::new(Int64Array::from(vec![*num_records as i64]))],
126+
None,
127+
1,
128+
)?)
129+
};
130+
131+
// Build schema dynamically based on the stats array's actual schema
132+
let base_schema: ArrowSchema = crate::transaction::BASE_ADD_FILES_SCHEMA
133+
.as_ref()
134+
.try_into_arrow()?;
135+
136+
// Replace the stats field with the actual stats schema
137+
let mut fields: Vec<Field> = base_schema
138+
.fields()
139+
.iter()
140+
.map(|f| f.as_ref().clone())
141+
.collect();
142+
// The last field is stats - replace it with the actual stats schema
143+
if let Some(last) = fields.last_mut() {
144+
*last = Field::new(
145+
"stats",
146+
DataType::Struct(stats_array.fields().clone()),
147+
true,
148+
);
149+
}
150+
let schema = Arc::new(ArrowSchema::new(fields));
113151

114152
Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
115-
Arc::new(
116-
crate::transaction::BASE_ADD_FILES_SCHEMA
117-
.as_ref()
118-
.try_into_arrow()?,
119-
),
120-
vec![path, partitions, size, modification_time, stats],
153+
schema,
154+
vec![path, partitions, size, modification_time, stats_array],
121155
)?)))
122156
}
123157
}
@@ -196,13 +230,36 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {
196230
/// use [`crate::transaction::Transaction::with_data_change`].
197231
///
198232
/// [add file metadata]: crate::transaction::Transaction::add_files_schema
233+
/// Write data to a parquet file and return file metadata with statistics.
234+
///
235+
/// This writes the data to a parquet file at the given path, collects statistics for
236+
/// the specified columns, and returns the file metadata as an EngineData batch which
237+
/// matches the [add file metadata] schema.
238+
///
239+
/// Only collects statistics for columns in `stats_columns`.
240+
///
241+
/// [add file metadata]: crate::transaction::Transaction::add_files_schema
199242
pub async fn write_parquet_file(
200243
&self,
201244
path: &url::Url,
202245
data: Box<dyn EngineData>,
203246
partition_values: HashMap<String, String>,
247+
stats_columns: &[String],
204248
) -> DeltaResult<Box<dyn EngineData>> {
205-
let parquet_metadata = self.write_parquet(path, data).await?;
249+
// Collect statistics from the data during write
250+
let record_batch = extract_record_batch(data.as_ref())?;
251+
252+
// Initialize stats collector and update with this batch
253+
let mut stats_collector = StatisticsCollector::new(record_batch.schema(), stats_columns);
254+
stats_collector.update(record_batch, None)?; // No mask for new file writes
255+
let stats = stats_collector.finalize()?;
256+
257+
// Write the parquet file
258+
let mut parquet_metadata = self.write_parquet(path, data).await?;
259+
260+
// Attach the collected statistics
261+
parquet_metadata = parquet_metadata.with_stats(stats);
262+
206263
parquet_metadata.as_record_batch(&partition_values)
207264
}
208265
}
@@ -302,8 +359,10 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
302359
&self,
303360
location: url::Url,
304361
mut data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>,
362+
stats_columns: &[String],
305363
) -> DeltaResult<()> {
306364
let store = self.store.clone();
365+
let stats_columns = stats_columns.to_vec();
307366

308367
self.task_executor.block_on(async move {
309368
let path = Path::from_url_path(location.path())?;
@@ -317,21 +376,29 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
317376

318377
let object_writer = ParquetObjectWriter::new(store, path);
319378
let schema = first_record_batch.schema();
320-
let mut writer = AsyncArrowWriter::try_new(object_writer, schema, None)?;
379+
let mut writer = AsyncArrowWriter::try_new(object_writer, schema.clone(), None)?;
321380

322-
// Write the first batch
381+
// Initialize stats collector
382+
let mut stats_collector = StatisticsCollector::new(schema, &stats_columns);
383+
384+
// Write and collect stats for the first batch (no mask for new file writes)
323385
writer.write(&first_record_batch).await?;
386+
stats_collector.update(&first_record_batch, None)?;
324387

325-
// Write remaining batches
388+
// Write remaining batches and accumulate stats
326389
for result in data {
327390
let engine_data = result?;
328391
let arrow_data = ArrowEngineData::try_from_engine_data(engine_data)?;
329392
let batch: RecordBatch = (*arrow_data).into();
330393
writer.write(&batch).await?;
394+
stats_collector.update(&batch, None)?; // No mask for new file writes
331395
}
332396

333397
writer.finish().await?;
334398

399+
// Finalize stats (collected but not returned for now)
400+
let _stats = stats_collector.finalize()?;
401+
335402
Ok(())
336403
})
337404
}
@@ -682,6 +749,7 @@ mod tests {
682749
size,
683750
},
684751
num_records,
752+
stats: _,
685753
} = write_metadata;
686754
let expected_location = Url::parse("memory:///data/").unwrap();
687755

@@ -776,7 +844,7 @@ mod tests {
776844
// Test writing through the trait method
777845
let file_url = Url::parse("memory:///test/data.parquet").unwrap();
778846
parquet_handler
779-
.write_parquet_file(file_url.clone(), data_iter)
847+
.write_parquet_file(file_url.clone(), data_iter, &[])
780848
.unwrap();
781849

782850
// Verify we can read the file back
@@ -964,7 +1032,7 @@ mod tests {
9641032
// Write the data
9651033
let file_url = Url::parse("memory:///roundtrip/test.parquet").unwrap();
9661034
parquet_handler
967-
.write_parquet_file(file_url.clone(), data_iter)
1035+
.write_parquet_file(file_url.clone(), data_iter, &[])
9681036
.unwrap();
9691037

9701038
// Read it back
@@ -1152,7 +1220,7 @@ mod tests {
11521220

11531221
// Write the first file
11541222
parquet_handler
1155-
.write_parquet_file(file_url.clone(), data_iter1)
1223+
.write_parquet_file(file_url.clone(), data_iter1, &[])
11561224
.unwrap();
11571225

11581226
// Create second data set with different data
@@ -1168,7 +1236,7 @@ mod tests {
11681236

11691237
// Overwrite with second file (overwrite=true)
11701238
parquet_handler
1171-
.write_parquet_file(file_url.clone(), data_iter2)
1239+
.write_parquet_file(file_url.clone(), data_iter2, &[])
11721240
.unwrap();
11731241

11741242
// Read back and verify it contains the second data set
@@ -1231,7 +1299,7 @@ mod tests {
12311299

12321300
// Write the first file
12331301
parquet_handler
1234-
.write_parquet_file(file_url.clone(), data_iter1)
1302+
.write_parquet_file(file_url.clone(), data_iter1, &[])
12351303
.unwrap();
12361304

12371305
// Create second data set
@@ -1247,7 +1315,7 @@ mod tests {
12471315

12481316
// Write again - should overwrite successfully (new behavior always overwrites)
12491317
parquet_handler
1250-
.write_parquet_file(file_url.clone(), data_iter2)
1318+
.write_parquet_file(file_url.clone(), data_iter2, &[])
12511319
.unwrap();
12521320

12531321
// Verify the file was overwritten with the new data

0 commit comments

Comments
 (0)