Skip to content

Commit 9849c2c

Browse files
committed
feat: integrate StatisticsCollector into parquet writes
- Import StatisticsCollector in parquet.rs - Add stats field to DataFileMetadata with with_stats() method - Update as_record_batch to use full stats if available - Update write_parquet_file to collect and attach stats - Update mod.rs write_parquet to pass stats_columns - Update write tests to expect full stats output - Fix write-table example API call
1 parent fc10d58 commit 9849c2c

File tree

4 files changed

+88
-22
lines changed

4 files changed

+88
-22
lines changed

kernel/examples/write-table/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,9 @@ async fn try_main() -> DeltaResult<()> {
9494
.with_data_change(true);
9595

9696
// Write the data using the engine
97-
let write_context = Arc::new(txn.get_write_context());
97+
let write_context = txn.get_write_context()?;
9898
let file_metadata = engine
99-
.write_parquet(&sample_data, write_context.as_ref(), HashMap::new())
99+
.write_parquet(&sample_data, &write_context, HashMap::new())
100100
.await?;
101101

102102
// Add the file metadata to the transaction

kernel/src/engine/default/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,12 @@ impl<E: TaskExecutor> DefaultEngine<E> {
217217
)?;
218218
let physical_data = logical_to_physical_expr.evaluate(data)?;
219219
self.parquet
220-
.write_parquet_file(write_context.target_dir(), physical_data, partition_values)
220+
.write_parquet_file(
221+
write_context.target_dir(),
222+
physical_data,
223+
partition_values,
224+
write_context.stats_columns(),
225+
)
221226
.await
222227
}
223228
}

kernel/src/engine/default/parquet.rs

Lines changed: 74 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::sync::Arc;
77
use delta_kernel_derive::internal_api;
88

99
use crate::arrow::array::builder::{MapBuilder, MapFieldNames, StringBuilder};
10-
use crate::arrow::array::{Int64Array, RecordBatch, StringArray, StructArray};
10+
use crate::arrow::array::{Array, Int64Array, RecordBatch, StringArray, StructArray};
1111
use crate::arrow::datatypes::{DataType, Field};
1212
use crate::parquet::arrow::arrow_reader::{
1313
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
@@ -23,9 +23,10 @@ use object_store::{DynObjectStore, ObjectStore};
2323
use uuid::Uuid;
2424

2525
use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
26+
use super::stats::StatisticsCollector;
2627
use super::UrlExt;
2728
use crate::engine::arrow_conversion::{TryFromArrow as _, TryIntoArrow as _};
28-
use crate::engine::arrow_data::ArrowEngineData;
29+
use crate::engine::arrow_data::{extract_record_batch, ArrowEngineData};
2930
use crate::engine::arrow_utils::{
3031
fixup_parquet_read, generate_mask, get_requested_indices, ordering_needs_row_indexes,
3132
RowIndexBuilder,
@@ -54,16 +55,25 @@ pub struct DataFileMetadata {
5455
file_meta: FileMeta,
5556
// NB: We use usize instead of u64 since arrow uses usize for record batch sizes
5657
num_records: usize,
58+
/// Collected statistics for this file (optional).
59+
stats: Option<StructArray>,
5760
}
5861

5962
impl DataFileMetadata {
6063
pub fn new(file_meta: FileMeta, num_records: usize) -> Self {
6164
Self {
6265
file_meta,
6366
num_records,
67+
stats: None,
6468
}
6569
}
6670

71+
/// Set the collected statistics for this file.
72+
pub fn with_stats(mut self, stats: StructArray) -> Self {
73+
self.stats = Some(stats);
74+
self
75+
}
76+
6777
/// Convert DataFileMetadata into a record batch which matches the schema returned by
6878
/// [`add_files_schema`].
6979
///
@@ -81,6 +91,7 @@ impl DataFileMetadata {
8191
size,
8292
},
8393
num_records,
94+
stats,
8495
} = self;
8596
// create the record batch of the write metadata
8697
let path = Arc::new(StringArray::from(vec![location.to_string()]));
@@ -104,20 +115,53 @@ impl DataFileMetadata {
104115
.map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?;
105116
let size = Arc::new(Int64Array::from(vec![size]));
106117
let modification_time = Arc::new(Int64Array::from(vec![*last_modified]));
107-
let stats = Arc::new(StructArray::try_new_with_length(
108-
vec![Field::new("numRecords", DataType::Int64, true)].into(),
109-
vec![Arc::new(Int64Array::from(vec![*num_records as i64]))],
110-
None,
111-
1,
112-
)?);
113118

114-
Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
115-
Arc::new(
116-
crate::transaction::BASE_ADD_FILES_SCHEMA
117-
.as_ref()
118-
.try_into_arrow()?,
119+
// Use full stats if available, otherwise just numRecords
120+
let stats_array: Arc<StructArray> = if let Some(full_stats) = stats {
121+
Arc::new(full_stats.clone())
122+
} else {
123+
Arc::new(StructArray::try_new_with_length(
124+
vec![Field::new("numRecords", DataType::Int64, true)].into(),
125+
vec![Arc::new(Int64Array::from(vec![*num_records as i64]))],
126+
None,
127+
1,
128+
)?)
129+
};
130+
131+
// Build schema dynamically based on stats
132+
let stats_field = Field::new("stats", stats_array.data_type().clone(), true);
133+
let schema = crate::arrow::datatypes::Schema::new(vec![
134+
Field::new("path", crate::arrow::datatypes::DataType::Utf8, false),
135+
Field::new(
136+
"partitionValues",
137+
crate::arrow::datatypes::DataType::Map(
138+
Arc::new(Field::new(
139+
"key_value",
140+
crate::arrow::datatypes::DataType::Struct(
141+
vec![
142+
Field::new("key", crate::arrow::datatypes::DataType::Utf8, false),
143+
Field::new("value", crate::arrow::datatypes::DataType::Utf8, true),
144+
]
145+
.into(),
146+
),
147+
false,
148+
)),
149+
false,
150+
),
151+
false,
152+
),
153+
Field::new("size", crate::arrow::datatypes::DataType::Int64, false),
154+
Field::new(
155+
"modificationTime",
156+
crate::arrow::datatypes::DataType::Int64,
157+
false,
119158
),
120-
vec![path, partitions, size, modification_time, stats],
159+
stats_field,
160+
]);
161+
162+
Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
163+
Arc::new(schema),
164+
vec![path, partitions, size, modification_time, stats_array],
121165
)?)))
122166
}
123167
}
@@ -201,8 +245,22 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {
201245
path: &url::Url,
202246
data: Box<dyn EngineData>,
203247
partition_values: HashMap<String, String>,
248+
stats_columns: &[String],
204249
) -> DeltaResult<Box<dyn EngineData>> {
205-
let parquet_metadata = self.write_parquet(path, data).await?;
250+
// Collect statistics from the data during write
251+
let record_batch = extract_record_batch(data.as_ref())?;
252+
253+
// Initialize stats collector and update with this batch
254+
let mut stats_collector = StatisticsCollector::new(record_batch.schema(), stats_columns);
255+
stats_collector.update(record_batch, None)?; // No mask for new file writes
256+
let stats = stats_collector.finalize()?;
257+
258+
// Write the parquet file
259+
let mut parquet_metadata = self.write_parquet(path, data).await?;
260+
261+
// Attach the collected statistics
262+
parquet_metadata = parquet_metadata.with_stats(stats);
263+
206264
parquet_metadata.as_record_batch(&partition_values)
207265
}
208266
}
@@ -684,6 +742,7 @@ mod tests {
684742
size,
685743
},
686744
num_records,
745+
..
687746
} = write_metadata;
688747
let expected_location = Url::parse("memory:///data/").unwrap();
689748

kernel/tests/write.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ async fn test_append() -> Result<(), Box<dyn std::error::Error>> {
393393
"size": size,
394394
"modificationTime": 0,
395395
"dataChange": true,
396-
"stats": "{\"numRecords\":3}"
396+
"stats": "{\"numRecords\":3,\"nullCount\":{\"number\":0},\"minValues\":{\"number\":1},\"maxValues\":{\"number\":3},\"tightBounds\":true}"
397397
}
398398
}),
399399
json!({
@@ -403,7 +403,7 @@ async fn test_append() -> Result<(), Box<dyn std::error::Error>> {
403403
"size": size,
404404
"modificationTime": 0,
405405
"dataChange": true,
406-
"stats": "{\"numRecords\":3}"
406+
"stats": "{\"numRecords\":3,\"nullCount\":{\"number\":0},\"minValues\":{\"number\":4},\"maxValues\":{\"number\":6},\"tightBounds\":true}"
407407
}
408408
}),
409409
];
@@ -601,7 +601,7 @@ async fn test_append_partitioned() -> Result<(), Box<dyn std::error::Error>> {
601601
"size": size,
602602
"modificationTime": 0,
603603
"dataChange": false,
604-
"stats": "{\"numRecords\":3}"
604+
"stats": "{\"numRecords\":3,\"nullCount\":{\"number\":0},\"minValues\":{\"number\":1},\"maxValues\":{\"number\":3},\"tightBounds\":true}"
605605
}
606606
}),
607607
json!({
@@ -613,7 +613,7 @@ async fn test_append_partitioned() -> Result<(), Box<dyn std::error::Error>> {
613613
"size": size,
614614
"modificationTime": 0,
615615
"dataChange": false,
616-
"stats": "{\"numRecords\":3}"
616+
"stats": "{\"numRecords\":3,\"nullCount\":{\"number\":0},\"minValues\":{\"number\":4},\"maxValues\":{\"number\":6},\"tightBounds\":true}"
617617
}
618618
}),
619619
];
@@ -1078,6 +1078,7 @@ async fn test_append_variant() -> Result<(), Box<dyn std::error::Error>> {
10781078
write_context.target_dir(),
10791079
Box::new(ArrowEngineData::new(data.clone())),
10801080
HashMap::new(),
1081+
write_context.stats_columns(),
10811082
)
10821083
.await?;
10831084

@@ -1251,6 +1252,7 @@ async fn test_shredded_variant_read_rejection() -> Result<(), Box<dyn std::error
12511252
write_context.target_dir(),
12521253
Box::new(ArrowEngineData::new(data.clone())),
12531254
HashMap::new(),
1255+
write_context.stats_columns(),
12541256
)
12551257
.await?;
12561258

0 commit comments

Comments
 (0)