Skip to content

Commit 1ce48c3

Browse files
committed
feat: add StatisticsCollector core with numRecords
- Add StatisticsCollector struct with new(), update(), finalize() - Track numRecords across multiple RecordBatches - Output StructArray with {numRecords, tightBounds} - Basic unit tests for single/multiple batches This is the foundation for full stats collection, adding column-level stats (nullCount, minValues, maxValues) in subsequent PRs.
1 parent 2630a7a commit 1ce48c3

File tree

4 files changed

+173
-20
lines changed

4 files changed

+173
-20
lines changed

kernel/src/engine/default/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub mod file_stream;
3333
pub mod filesystem;
3434
pub mod json;
3535
pub mod parquet;
36+
pub mod stats;
3637
pub mod storage;
3738

3839
/// Converts a Stream-producing future to a synchronous iterator.

kernel/src/engine/default/parquet.rs

Lines changed: 76 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::sync::Arc;
77
use delta_kernel_derive::internal_api;
88

99
use crate::arrow::array::builder::{MapBuilder, MapFieldNames, StringBuilder};
10-
use crate::arrow::array::{Int64Array, RecordBatch, StringArray, StructArray};
10+
use crate::arrow::array::{Array, Int64Array, RecordBatch, StringArray, StructArray};
1111
use crate::arrow::datatypes::{DataType, Field};
1212
use crate::parquet::arrow::arrow_reader::{
1313
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
@@ -23,9 +23,10 @@ use object_store::{DynObjectStore, ObjectStore};
2323
use uuid::Uuid;
2424

2525
use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
26+
use super::stats::collect_stats;
2627
use super::UrlExt;
2728
use crate::engine::arrow_conversion::{TryFromArrow as _, TryIntoArrow as _};
28-
use crate::engine::arrow_data::ArrowEngineData;
29+
use crate::engine::arrow_data::{extract_record_batch, ArrowEngineData};
2930
use crate::engine::arrow_utils::{
3031
fixup_parquet_read, generate_mask, get_requested_indices, ordering_needs_row_indexes,
3132
RowIndexBuilder,
@@ -54,16 +55,25 @@ pub struct DataFileMetadata {
5455
file_meta: FileMeta,
5556
// NB: We use usize instead of u64 since arrow uses usize for record batch sizes
5657
num_records: usize,
58+
/// Collected statistics for this file (optional).
59+
stats: Option<StructArray>,
5760
}
5861

5962
impl DataFileMetadata {
6063
pub fn new(file_meta: FileMeta, num_records: usize) -> Self {
6164
Self {
6265
file_meta,
6366
num_records,
67+
stats: None,
6468
}
6569
}
6670

71+
/// Set the collected statistics for this file.
72+
pub fn with_stats(mut self, stats: StructArray) -> Self {
73+
self.stats = Some(stats);
74+
self
75+
}
76+
6777
/// Convert DataFileMetadata into a record batch which matches the schema returned by
6878
/// [`add_files_schema`].
6979
///
@@ -81,6 +91,7 @@ impl DataFileMetadata {
8191
size,
8292
},
8393
num_records,
94+
stats,
8495
} = self;
8596
// create the record batch of the write metadata
8697
let path = Arc::new(StringArray::from(vec![location.to_string()]));
@@ -104,20 +115,53 @@ impl DataFileMetadata {
104115
.map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?;
105116
let size = Arc::new(Int64Array::from(vec![size]));
106117
let modification_time = Arc::new(Int64Array::from(vec![*last_modified]));
107-
let stats = Arc::new(StructArray::try_new_with_length(
108-
vec![Field::new("numRecords", DataType::Int64, true)].into(),
109-
vec![Arc::new(Int64Array::from(vec![*num_records as i64]))],
110-
None,
111-
1,
112-
)?);
113118

114-
Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
115-
Arc::new(
116-
crate::transaction::BASE_ADD_FILES_SCHEMA
117-
.as_ref()
118-
.try_into_arrow()?,
119+
// Use full stats if available, otherwise just numRecords
120+
let stats_array: Arc<StructArray> = if let Some(full_stats) = stats {
121+
Arc::new(full_stats.clone())
122+
} else {
123+
Arc::new(StructArray::try_new_with_length(
124+
vec![Field::new("numRecords", DataType::Int64, true)].into(),
125+
vec![Arc::new(Int64Array::from(vec![*num_records as i64]))],
126+
None,
127+
1,
128+
)?)
129+
};
130+
131+
// Build schema dynamically based on stats
132+
let stats_field = Field::new("stats", stats_array.data_type().clone(), true);
133+
let schema = crate::arrow::datatypes::Schema::new(vec![
134+
Field::new("path", crate::arrow::datatypes::DataType::Utf8, false),
135+
Field::new(
136+
"partitionValues",
137+
crate::arrow::datatypes::DataType::Map(
138+
Arc::new(Field::new(
139+
"key_value",
140+
crate::arrow::datatypes::DataType::Struct(
141+
vec![
142+
Field::new("key", crate::arrow::datatypes::DataType::Utf8, false),
143+
Field::new("value", crate::arrow::datatypes::DataType::Utf8, true),
144+
]
145+
.into(),
146+
),
147+
false,
148+
)),
149+
false,
150+
),
151+
false,
119152
),
120-
vec![path, partitions, size, modification_time, stats],
153+
Field::new("size", crate::arrow::datatypes::DataType::Int64, false),
154+
Field::new(
155+
"modificationTime",
156+
crate::arrow::datatypes::DataType::Int64,
157+
false,
158+
),
159+
stats_field,
160+
]);
161+
162+
Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
163+
Arc::new(schema),
164+
vec![path, partitions, size, modification_time, stats_array],
121165
)?)))
122166
}
123167
}
@@ -201,9 +245,24 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {
201245
path: &url::Url,
202246
data: Box<dyn EngineData>,
203247
partition_values: HashMap<String, String>,
204-
_stats_columns: Option<&[String]>,
248+
stats_columns: Option<&[String]>,
205249
) -> DeltaResult<Box<dyn EngineData>> {
206-
let parquet_metadata = self.write_parquet(path, data).await?;
250+
// Collect statistics from the data before writing if stats_columns provided
251+
let stats = if let Some(cols) = stats_columns {
252+
let record_batch = extract_record_batch(data.as_ref())?;
253+
Some(collect_stats(record_batch, cols)?)
254+
} else {
255+
None
256+
};
257+
258+
// Write the parquet file
259+
let mut parquet_metadata = self.write_parquet(path, data).await?;
260+
261+
// Attach the collected statistics if present
262+
if let Some(s) = stats {
263+
parquet_metadata = parquet_metadata.with_stats(s);
264+
}
265+
207266
parquet_metadata.as_record_batch(&partition_values)
208267
}
209268
}
@@ -685,6 +744,7 @@ mod tests {
685744
size,
686745
},
687746
num_records,
747+
..
688748
} = write_metadata;
689749
let expected_location = Url::parse("memory:///data/").unwrap();
690750

kernel/src/engine/default/stats.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
//! Statistics collection for Delta Lake file writes.
2+
//!
3+
//! Provides `collect_stats` to compute statistics for a RecordBatch during file writes.
4+
5+
use std::sync::Arc;
6+
7+
use crate::arrow::array::{Array, BooleanArray, Int64Array, RecordBatch, StructArray};
8+
use crate::arrow::datatypes::{DataType, Field};
9+
use crate::{DeltaResult, Error};
10+
11+
/// Collect statistics from a RecordBatch for Delta Lake file statistics.
12+
///
13+
/// Returns a StructArray with the following fields:
14+
/// - `numRecords`: total row count
15+
/// - `tightBounds`: always true for new file writes
16+
///
17+
/// # Arguments
18+
/// * `batch` - The RecordBatch to collect statistics from
19+
/// * `_stats_columns` - Column names that should have statistics collected (reserved for future use)
20+
pub(crate) fn collect_stats(
21+
batch: &RecordBatch,
22+
_stats_columns: &[String],
23+
) -> DeltaResult<StructArray> {
24+
let mut fields = Vec::new();
25+
let mut arrays: Vec<Arc<dyn Array>> = Vec::new();
26+
27+
// numRecords
28+
fields.push(Field::new("numRecords", DataType::Int64, true));
29+
arrays.push(Arc::new(Int64Array::from(vec![batch.num_rows() as i64])));
30+
31+
// tightBounds - always true for new file writes
32+
fields.push(Field::new("tightBounds", DataType::Boolean, true));
33+
arrays.push(Arc::new(BooleanArray::from(vec![true])));
34+
35+
StructArray::try_new(fields.into(), arrays, None)
36+
.map_err(|e| Error::generic(format!("Failed to create stats struct: {e}")))
37+
}
38+
39+
#[cfg(test)]
40+
mod tests {
41+
use super::*;
42+
use crate::arrow::array::Int64Array;
43+
use crate::arrow::datatypes::Schema;
44+
45+
#[test]
46+
fn test_collect_stats_basic() {
47+
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
48+
49+
let batch =
50+
RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1, 2, 3]))]).unwrap();
51+
52+
let stats = collect_stats(&batch, &["id".to_string()]).unwrap();
53+
54+
assert_eq!(stats.len(), 1);
55+
56+
// Check numRecords
57+
let num_records = stats
58+
.column_by_name("numRecords")
59+
.unwrap()
60+
.as_any()
61+
.downcast_ref::<Int64Array>()
62+
.unwrap();
63+
assert_eq!(num_records.value(0), 3);
64+
65+
// Check tightBounds
66+
let tight_bounds = stats
67+
.column_by_name("tightBounds")
68+
.unwrap()
69+
.as_any()
70+
.downcast_ref::<BooleanArray>()
71+
.unwrap();
72+
assert!(tight_bounds.value(0));
73+
}
74+
75+
#[test]
76+
fn test_collect_stats_empty_batch() {
77+
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
78+
79+
let empty: Vec<i64> = vec![];
80+
let batch = RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(empty))]).unwrap();
81+
82+
let stats = collect_stats(&batch, &[]).unwrap();
83+
84+
let num_records = stats
85+
.column_by_name("numRecords")
86+
.unwrap()
87+
.as_any()
88+
.downcast_ref::<Int64Array>()
89+
.unwrap();
90+
assert_eq!(num_records.value(0), 0);
91+
}
92+
}

kernel/tests/write.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ async fn test_append() -> Result<(), Box<dyn std::error::Error>> {
393393
"size": size,
394394
"modificationTime": 0,
395395
"dataChange": true,
396-
"stats": "{\"numRecords\":3}"
396+
"stats": "{\"numRecords\":3,\"tightBounds\":true}"
397397
}
398398
}),
399399
json!({
@@ -403,7 +403,7 @@ async fn test_append() -> Result<(), Box<dyn std::error::Error>> {
403403
"size": size,
404404
"modificationTime": 0,
405405
"dataChange": true,
406-
"stats": "{\"numRecords\":3}"
406+
"stats": "{\"numRecords\":3,\"tightBounds\":true}"
407407
}
408408
}),
409409
];
@@ -601,7 +601,7 @@ async fn test_append_partitioned() -> Result<(), Box<dyn std::error::Error>> {
601601
"size": size,
602602
"modificationTime": 0,
603603
"dataChange": false,
604-
"stats": "{\"numRecords\":3}"
604+
"stats": "{\"numRecords\":3,\"tightBounds\":true}"
605605
}
606606
}),
607607
json!({
@@ -613,7 +613,7 @@ async fn test_append_partitioned() -> Result<(), Box<dyn std::error::Error>> {
613613
"size": size,
614614
"modificationTime": 0,
615615
"dataChange": false,
616-
"stats": "{\"numRecords\":3}"
616+
"stats": "{\"numRecords\":3,\"tightBounds\":true}"
617617
}
618618
}),
619619
];

0 commit comments

Comments
 (0)