Skip to content

Commit ee9ac0c

Browse files
committed
feat: add StatisticsCollector core with numRecords
- Add StatisticsCollector struct with new(), update(), finalize() - Track numRecords across multiple RecordBatches - Output StructArray with {numRecords, tightBounds} - Basic unit tests for single/multiple batches This is the foundation for full stats collection, adding column-level stats (nullCount, minValues, maxValues) in subsequent PRs.
1 parent 05fad0e commit ee9ac0c

File tree

2 files changed

+160
-0
lines changed

2 files changed

+160
-0
lines changed

kernel/src/engine/default/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub mod file_stream;
3333
pub mod filesystem;
3434
pub mod json;
3535
pub mod parquet;
36+
pub mod stats;
3637
pub mod storage;
3738

3839
/// Converts a Stream-producing future to a synchronous iterator.

kernel/src/engine/default/stats.rs

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
//! Statistics collection for Delta Lake file writes.
2+
//!
3+
//! This module provides `StatisticsCollector` which accumulates statistics
4+
//! across multiple Arrow RecordBatches during file writes.
5+
6+
use std::sync::Arc;
7+
8+
use crate::arrow::array::{Array, BooleanArray, Int64Array, RecordBatch, StructArray};
9+
use crate::arrow::datatypes::{DataType, Field};
10+
use crate::{DeltaResult, Error};
11+
12+
/// Collects statistics from RecordBatches for Delta Lake file statistics.
13+
/// Supports streaming accumulation across multiple batches.
14+
#[allow(dead_code)]
15+
pub(crate) struct StatisticsCollector {
16+
/// Total number of records across all batches.
17+
num_records: i64,
18+
/// Column names that should have stats collected.
19+
#[allow(dead_code)]
20+
stats_columns: Vec<String>,
21+
}
22+
23+
#[allow(dead_code)]
24+
impl StatisticsCollector {
25+
/// Create a new statistics collector.
26+
///
27+
/// # Arguments
28+
/// * `data_schema` - The Arrow schema of the data being written
29+
/// * `stats_columns` - Column names that should have statistics collected
30+
pub(crate) fn new(
31+
_data_schema: Arc<crate::arrow::datatypes::Schema>,
32+
stats_columns: &[String],
33+
) -> Self {
34+
Self {
35+
num_records: 0,
36+
stats_columns: stats_columns.to_vec(),
37+
}
38+
}
39+
40+
/// Update statistics with data from a RecordBatch.
41+
///
42+
/// This method accumulates statistics across multiple batches.
43+
pub(crate) fn update(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
44+
self.num_records += batch.num_rows() as i64;
45+
Ok(())
46+
}
47+
48+
/// Finalize and return the collected statistics as a StructArray.
49+
///
50+
/// Returns a single-row StructArray with the Delta Lake stats schema:
51+
/// - numRecords: total row count
52+
/// - tightBounds: true for new files (no deletion vectors applied)
53+
pub(crate) fn finalize(&self) -> DeltaResult<StructArray> {
54+
let mut fields = Vec::new();
55+
let mut arrays: Vec<Arc<dyn Array>> = Vec::new();
56+
57+
// numRecords
58+
fields.push(Field::new("numRecords", DataType::Int64, true));
59+
arrays.push(Arc::new(Int64Array::from(vec![self.num_records])));
60+
61+
// tightBounds - always true for new file writes
62+
// (false only when deletion vectors are applied to existing files)
63+
fields.push(Field::new("tightBounds", DataType::Boolean, true));
64+
arrays.push(Arc::new(BooleanArray::from(vec![true])));
65+
66+
StructArray::try_new(fields.into(), arrays, None)
67+
.map_err(|e| Error::generic(format!("Failed to create stats struct: {e}")))
68+
}
69+
}
70+
71+
#[cfg(test)]
72+
mod tests {
73+
use super::*;
74+
use crate::arrow::array::{Array, Int64Array};
75+
use crate::arrow::datatypes::Schema;
76+
77+
#[test]
78+
fn test_statistics_collector_single_batch() {
79+
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
80+
81+
let batch = RecordBatch::try_new(
82+
schema.clone(),
83+
vec![Arc::new(Int64Array::from(vec![1, 2, 3]))],
84+
)
85+
.unwrap();
86+
87+
let mut collector = StatisticsCollector::new(schema, &["id".to_string()]);
88+
collector.update(&batch).unwrap();
89+
let stats = collector.finalize().unwrap();
90+
91+
// Check numRecords
92+
assert_eq!(stats.len(), 1);
93+
let num_records = stats
94+
.column_by_name("numRecords")
95+
.unwrap()
96+
.as_any()
97+
.downcast_ref::<Int64Array>()
98+
.unwrap();
99+
assert_eq!(num_records.value(0), 3);
100+
101+
// Check tightBounds
102+
let tight_bounds = stats
103+
.column_by_name("tightBounds")
104+
.unwrap()
105+
.as_any()
106+
.downcast_ref::<BooleanArray>()
107+
.unwrap();
108+
assert!(tight_bounds.value(0));
109+
}
110+
111+
#[test]
112+
fn test_statistics_collector_multiple_batches() {
113+
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
114+
115+
let batch1 = RecordBatch::try_new(
116+
schema.clone(),
117+
vec![Arc::new(Int64Array::from(vec![1, 2, 3]))],
118+
)
119+
.unwrap();
120+
121+
let batch2 =
122+
RecordBatch::try_new(schema.clone(), vec![Arc::new(Int64Array::from(vec![4, 5]))])
123+
.unwrap();
124+
125+
let mut collector = StatisticsCollector::new(schema, &["id".to_string()]);
126+
collector.update(&batch1).unwrap();
127+
collector.update(&batch2).unwrap();
128+
let stats = collector.finalize().unwrap();
129+
130+
let num_records = stats
131+
.column_by_name("numRecords")
132+
.unwrap()
133+
.as_any()
134+
.downcast_ref::<Int64Array>()
135+
.unwrap();
136+
assert_eq!(num_records.value(0), 5);
137+
}
138+
139+
#[test]
140+
fn test_statistics_collector_empty_batch() {
141+
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
142+
143+
let empty: Vec<i64> = vec![];
144+
let batch =
145+
RecordBatch::try_new(schema.clone(), vec![Arc::new(Int64Array::from(empty))]).unwrap();
146+
147+
let mut collector = StatisticsCollector::new(schema, &[]);
148+
collector.update(&batch).unwrap();
149+
let stats = collector.finalize().unwrap();
150+
151+
let num_records = stats
152+
.column_by_name("numRecords")
153+
.unwrap()
154+
.as_any()
155+
.downcast_ref::<Int64Array>()
156+
.unwrap();
157+
assert_eq!(num_records.value(0), 0);
158+
}
159+
}

0 commit comments

Comments
 (0)