Skip to content

Commit ba5b03f

Browse files
committed
feat: add StatisticsCollector core with numRecords
- Add StatisticsCollector struct with new(), update(), finalize() - Track numRecords across multiple RecordBatches - Output StructArray with {numRecords, tightBounds} - Basic unit tests for single/multiple batches This is the foundation for full stats collection, adding column-level stats (nullCount, minValues, maxValues) in subsequent PRs.
1 parent 8d38b03 commit ba5b03f

File tree

2 files changed

+158
-0
lines changed

2 files changed

+158
-0
lines changed

kernel/src/engine/default/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub mod file_stream;
3333
pub mod filesystem;
3434
pub mod json;
3535
pub mod parquet;
36+
pub mod stats;
3637
pub mod storage;
3738

3839
/// Converts a Stream-producing future to a synchronous iterator.

kernel/src/engine/default/stats.rs

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
//! Statistics collection for Delta Lake file writes.
2+
//!
3+
//! This module provides `StatisticsCollector` which accumulates statistics
4+
//! across multiple Arrow RecordBatches during file writes.
5+
6+
use std::sync::Arc;
7+
8+
use crate::arrow::array::{Array, BooleanArray, Int64Array, RecordBatch, StructArray};
9+
use crate::arrow::datatypes::{DataType, Field};
10+
use crate::{DeltaResult, Error};
11+
12+
/// Collects statistics from RecordBatches for Delta Lake file statistics.
13+
/// Supports streaming accumulation across multiple batches.
14+
pub(crate) struct StatisticsCollector {
15+
/// Total number of records across all batches.
16+
num_records: i64,
17+
/// Column names that should have stats collected.
18+
#[allow(dead_code)]
19+
stats_columns: Vec<String>,
20+
}
21+
22+
impl StatisticsCollector {
23+
/// Create a new statistics collector.
24+
///
25+
/// # Arguments
26+
/// * `data_schema` - The Arrow schema of the data being written
27+
/// * `stats_columns` - Column names that should have statistics collected
28+
pub(crate) fn new(
29+
_data_schema: Arc<crate::arrow::datatypes::Schema>,
30+
stats_columns: &[String],
31+
) -> Self {
32+
Self {
33+
num_records: 0,
34+
stats_columns: stats_columns.to_vec(),
35+
}
36+
}
37+
38+
/// Update statistics with data from a RecordBatch.
39+
///
40+
/// This method accumulates statistics across multiple batches.
41+
pub(crate) fn update(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
42+
self.num_records += batch.num_rows() as i64;
43+
Ok(())
44+
}
45+
46+
/// Finalize and return the collected statistics as a StructArray.
47+
///
48+
/// Returns a single-row StructArray with the Delta Lake stats schema:
49+
/// - numRecords: total row count
50+
/// - tightBounds: true for new files (no deletion vectors applied)
51+
pub(crate) fn finalize(&self) -> DeltaResult<StructArray> {
52+
let mut fields = Vec::new();
53+
let mut arrays: Vec<Arc<dyn Array>> = Vec::new();
54+
55+
// numRecords
56+
fields.push(Field::new("numRecords", DataType::Int64, true));
57+
arrays.push(Arc::new(Int64Array::from(vec![self.num_records])));
58+
59+
// tightBounds - always true for new file writes
60+
// (false only when deletion vectors are applied to existing files)
61+
fields.push(Field::new("tightBounds", DataType::Boolean, true));
62+
arrays.push(Arc::new(BooleanArray::from(vec![true])));
63+
64+
StructArray::try_new(fields.into(), arrays, None)
65+
.map_err(|e| Error::generic(format!("Failed to create stats struct: {e}")))
66+
}
67+
}
68+
69+
#[cfg(test)]
70+
mod tests {
71+
use super::*;
72+
use crate::arrow::array::{Array, Int64Array};
73+
use crate::arrow::datatypes::Schema;
74+
75+
#[test]
76+
fn test_statistics_collector_single_batch() {
77+
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
78+
79+
let batch = RecordBatch::try_new(
80+
schema.clone(),
81+
vec![Arc::new(Int64Array::from(vec![1, 2, 3]))],
82+
)
83+
.unwrap();
84+
85+
let mut collector = StatisticsCollector::new(schema, &["id".to_string()]);
86+
collector.update(&batch).unwrap();
87+
let stats = collector.finalize().unwrap();
88+
89+
// Check numRecords
90+
assert_eq!(stats.len(), 1);
91+
let num_records = stats
92+
.column_by_name("numRecords")
93+
.unwrap()
94+
.as_any()
95+
.downcast_ref::<Int64Array>()
96+
.unwrap();
97+
assert_eq!(num_records.value(0), 3);
98+
99+
// Check tightBounds
100+
let tight_bounds = stats
101+
.column_by_name("tightBounds")
102+
.unwrap()
103+
.as_any()
104+
.downcast_ref::<BooleanArray>()
105+
.unwrap();
106+
assert!(tight_bounds.value(0));
107+
}
108+
109+
#[test]
110+
fn test_statistics_collector_multiple_batches() {
111+
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
112+
113+
let batch1 = RecordBatch::try_new(
114+
schema.clone(),
115+
vec![Arc::new(Int64Array::from(vec![1, 2, 3]))],
116+
)
117+
.unwrap();
118+
119+
let batch2 =
120+
RecordBatch::try_new(schema.clone(), vec![Arc::new(Int64Array::from(vec![4, 5]))])
121+
.unwrap();
122+
123+
let mut collector = StatisticsCollector::new(schema, &["id".to_string()]);
124+
collector.update(&batch1).unwrap();
125+
collector.update(&batch2).unwrap();
126+
let stats = collector.finalize().unwrap();
127+
128+
let num_records = stats
129+
.column_by_name("numRecords")
130+
.unwrap()
131+
.as_any()
132+
.downcast_ref::<Int64Array>()
133+
.unwrap();
134+
assert_eq!(num_records.value(0), 5);
135+
}
136+
137+
#[test]
138+
fn test_statistics_collector_empty_batch() {
139+
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
140+
141+
let empty: Vec<i64> = vec![];
142+
let batch =
143+
RecordBatch::try_new(schema.clone(), vec![Arc::new(Int64Array::from(empty))]).unwrap();
144+
145+
let mut collector = StatisticsCollector::new(schema, &[]);
146+
collector.update(&batch).unwrap();
147+
let stats = collector.finalize().unwrap();
148+
149+
let num_records = stats
150+
.column_by_name("numRecords")
151+
.unwrap()
152+
.as_any()
153+
.downcast_ref::<Int64Array>()
154+
.unwrap();
155+
assert_eq!(num_records.value(0), 0);
156+
}
157+
}

0 commit comments

Comments
 (0)