Skip to content

Commit e054c1b

Browse files
committed
feat: add nullCount support to StatisticsCollector
- Add null count tracking for all columns - Support nested struct null counts - Merge null counts across multiple batches - Only collect for columns in stats_columns - Tests for null counting across batches
1 parent 9f4536c commit e054c1b

File tree

1 file changed

+252
-41
lines changed

1 file changed

+252
-41
lines changed

kernel/src/engine/default/stats.rs

Lines changed: 252 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
//! This module provides `StatisticsCollector` which accumulates statistics
44
//! across multiple Arrow RecordBatches during file writes.
55
6+
use std::collections::HashSet;
67
use std::sync::Arc;
78

8-
use crate::arrow::array::{Array, BooleanArray, Int64Array, RecordBatch, StructArray};
9-
use crate::arrow::datatypes::{DataType, Field};
9+
use crate::arrow::array::{Array, ArrayRef, BooleanArray, Int64Array, RecordBatch, StructArray};
10+
use crate::arrow::datatypes::{DataType, Field, Fields};
1011
use crate::{DeltaResult, Error};
1112

1213
/// Collects statistics from RecordBatches for Delta Lake file statistics.
@@ -15,9 +16,12 @@ use crate::{DeltaResult, Error};
1516
pub(crate) struct StatisticsCollector {
1617
/// Total number of records across all batches.
1718
num_records: i64,
19+
/// Column names from the data schema.
20+
column_names: Vec<String>,
1821
/// Column names that should have stats collected.
19-
#[allow(dead_code)]
20-
stats_columns: Vec<String>,
22+
stats_columns: HashSet<String>,
23+
/// Null counts per column. For structs, this is a StructArray with nested Int64Arrays.
24+
null_counts: Vec<ArrayRef>,
2125
}
2226

2327
#[allow(dead_code)]
@@ -28,28 +32,146 @@ impl StatisticsCollector {
2832
/// * `data_schema` - The Arrow schema of the data being written
2933
/// * `stats_columns` - Column names that should have statistics collected
3034
pub(crate) fn new(
31-
_data_schema: Arc<crate::arrow::datatypes::Schema>,
35+
data_schema: Arc<crate::arrow::datatypes::Schema>,
3236
stats_columns: &[String],
3337
) -> Self {
38+
let stats_set: HashSet<String> = stats_columns.iter().cloned().collect();
39+
40+
let mut column_names = Vec::with_capacity(data_schema.fields().len());
41+
let mut null_counts = Vec::with_capacity(data_schema.fields().len());
42+
43+
for field in data_schema.fields() {
44+
column_names.push(field.name().clone());
45+
null_counts.push(Self::create_zero_null_count(field.data_type()));
46+
}
47+
3448
Self {
3549
num_records: 0,
36-
stats_columns: stats_columns.to_vec(),
50+
column_names,
51+
stats_columns: stats_set,
52+
null_counts,
53+
}
54+
}
55+
56+
/// Check if a column should have stats collected.
57+
fn should_collect_stats(&self, column_name: &str) -> bool {
58+
self.stats_columns.contains(column_name)
59+
}
60+
61+
/// Create a zero-initialized null count structure for the given data type.
62+
fn create_zero_null_count(data_type: &DataType) -> ArrayRef {
63+
match data_type {
64+
DataType::Struct(fields) => {
65+
let children: Vec<ArrayRef> = fields
66+
.iter()
67+
.map(|f| Self::create_zero_null_count(f.data_type()))
68+
.collect();
69+
let null_count_fields: Fields = fields
70+
.iter()
71+
.map(|f| {
72+
let child_type = Self::null_count_data_type(f.data_type());
73+
Field::new(f.name(), child_type, true)
74+
})
75+
.collect();
76+
Arc::new(
77+
StructArray::try_new(null_count_fields, children, None)
78+
.expect("Failed to create null count struct"),
79+
)
80+
}
81+
_ => Arc::new(Int64Array::from(vec![0i64])),
82+
}
83+
}
84+
85+
/// Get the data type for null counts of a given data type.
86+
fn null_count_data_type(data_type: &DataType) -> DataType {
87+
match data_type {
88+
DataType::Struct(fields) => {
89+
let null_count_fields: Vec<Field> = fields
90+
.iter()
91+
.map(|f| Field::new(f.name(), Self::null_count_data_type(f.data_type()), true))
92+
.collect();
93+
DataType::Struct(null_count_fields.into())
94+
}
95+
_ => DataType::Int64,
96+
}
97+
}
98+
99+
/// Compute null counts for a column.
100+
fn compute_null_counts(column: &ArrayRef) -> ArrayRef {
101+
match column.data_type() {
102+
DataType::Struct(fields) => {
103+
let struct_array = column.as_any().downcast_ref::<StructArray>().unwrap();
104+
let children: Vec<ArrayRef> = (0..fields.len())
105+
.map(|i| Self::compute_null_counts(struct_array.column(i)))
106+
.collect();
107+
let null_count_fields: Fields = fields
108+
.iter()
109+
.map(|f| Field::new(f.name(), Self::null_count_data_type(f.data_type()), true))
110+
.collect();
111+
Arc::new(
112+
StructArray::try_new(null_count_fields, children, None)
113+
.expect("Failed to create null count struct"),
114+
)
115+
}
116+
_ => {
117+
let null_count = column.null_count() as i64;
118+
Arc::new(Int64Array::from(vec![null_count]))
119+
}
120+
}
121+
}
122+
123+
/// Merge two null count structures by adding them together.
124+
fn merge_null_counts(existing: &ArrayRef, new: &ArrayRef) -> ArrayRef {
125+
match existing.data_type() {
126+
DataType::Struct(fields) => {
127+
let existing_struct = existing.as_any().downcast_ref::<StructArray>().unwrap();
128+
let new_struct = new.as_any().downcast_ref::<StructArray>().unwrap();
129+
130+
let children: Vec<ArrayRef> = (0..fields.len())
131+
.map(|i| {
132+
Self::merge_null_counts(existing_struct.column(i), new_struct.column(i))
133+
})
134+
.collect();
135+
136+
let null_count_fields: Fields = fields
137+
.iter()
138+
.map(|f| Field::new(f.name(), Self::null_count_data_type(f.data_type()), true))
139+
.collect();
140+
Arc::new(
141+
StructArray::try_new(null_count_fields, children, None)
142+
.expect("Failed to merge null count struct"),
143+
)
144+
}
145+
_ => {
146+
let existing_val = existing
147+
.as_any()
148+
.downcast_ref::<Int64Array>()
149+
.unwrap()
150+
.value(0);
151+
let new_val = new.as_any().downcast_ref::<Int64Array>().unwrap().value(0);
152+
Arc::new(Int64Array::from(vec![existing_val + new_val]))
153+
}
37154
}
38155
}
39156

40157
/// Update statistics with data from a RecordBatch.
41-
///
42-
/// This method accumulates statistics across multiple batches.
43158
pub(crate) fn update(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
44159
self.num_records += batch.num_rows() as i64;
160+
161+
// Update null counts
162+
for (col_idx, column) in batch.columns().iter().enumerate() {
163+
let col_name = &self.column_names[col_idx];
164+
if self.should_collect_stats(col_name) {
165+
let batch_null_counts = Self::compute_null_counts(column);
166+
self.null_counts[col_idx] =
167+
Self::merge_null_counts(&self.null_counts[col_idx], &batch_null_counts);
168+
}
169+
}
170+
45171
Ok(())
46172
}
47173

48174
/// Finalize and return the collected statistics as a StructArray.
49-
///
50-
/// Returns a single-row StructArray with the Delta Lake stats schema:
51-
/// - numRecords: total row count
52-
/// - tightBounds: true for new files (no deletion vectors applied)
53175
pub(crate) fn finalize(&self) -> DeltaResult<StructArray> {
54176
let mut fields = Vec::new();
55177
let mut arrays: Vec<Arc<dyn Array>> = Vec::new();
@@ -58,8 +180,37 @@ impl StatisticsCollector {
58180
fields.push(Field::new("numRecords", DataType::Int64, true));
59181
arrays.push(Arc::new(Int64Array::from(vec![self.num_records])));
60182

61-
// tightBounds - always true for new file writes
62-
// (false only when deletion vectors are applied to existing files)
183+
// nullCount - nested struct matching data schema
184+
let null_count_fields: Vec<Field> = self
185+
.column_names
186+
.iter()
187+
.enumerate()
188+
.filter(|(_, name)| self.should_collect_stats(name))
189+
.map(|(idx, name)| Field::new(name, self.null_counts[idx].data_type().clone(), true))
190+
.collect();
191+
192+
if !null_count_fields.is_empty() {
193+
let null_count_arrays: Vec<ArrayRef> = self
194+
.column_names
195+
.iter()
196+
.enumerate()
197+
.filter(|(_, name)| self.should_collect_stats(name))
198+
.map(|(idx, _)| self.null_counts[idx].clone())
199+
.collect();
200+
201+
let null_count_struct =
202+
StructArray::try_new(null_count_fields.into(), null_count_arrays, None)
203+
.map_err(|e| Error::generic(format!("Failed to create nullCount: {e}")))?;
204+
205+
fields.push(Field::new(
206+
"nullCount",
207+
null_count_struct.data_type().clone(),
208+
true,
209+
));
210+
arrays.push(Arc::new(null_count_struct));
211+
}
212+
213+
// tightBounds
63214
fields.push(Field::new("tightBounds", DataType::Boolean, true));
64215
arrays.push(Arc::new(BooleanArray::from(vec![true])));
65216

@@ -71,7 +222,7 @@ impl StatisticsCollector {
71222
#[cfg(test)]
72223
mod tests {
73224
use super::*;
74-
use crate::arrow::array::{Array, Int64Array};
225+
use crate::arrow::array::{Array, Int64Array, StringArray};
75226
use crate::arrow::datatypes::Schema;
76227

77228
#[test]
@@ -88,7 +239,6 @@ mod tests {
88239
collector.update(&batch).unwrap();
89240
let stats = collector.finalize().unwrap();
90241

91-
// Check numRecords
92242
assert_eq!(stats.len(), 1);
93243
let num_records = stats
94244
.column_by_name("numRecords")
@@ -97,63 +247,124 @@ mod tests {
97247
.downcast_ref::<Int64Array>()
98248
.unwrap();
99249
assert_eq!(num_records.value(0), 3);
250+
}
251+
252+
#[test]
253+
fn test_statistics_collector_null_counts() {
254+
let schema = Arc::new(Schema::new(vec![
255+
Field::new("id", DataType::Int64, false),
256+
Field::new("value", DataType::Utf8, true),
257+
]));
100258

101-
// Check tightBounds
102-
let tight_bounds = stats
103-
.column_by_name("tightBounds")
259+
let batch = RecordBatch::try_new(
260+
schema.clone(),
261+
vec![
262+
Arc::new(Int64Array::from(vec![1, 2, 3])),
263+
Arc::new(StringArray::from(vec![Some("a"), None, Some("c")])),
264+
],
265+
)
266+
.unwrap();
267+
268+
let mut collector =
269+
StatisticsCollector::new(schema, &["id".to_string(), "value".to_string()]);
270+
collector.update(&batch).unwrap();
271+
let stats = collector.finalize().unwrap();
272+
273+
// Check nullCount struct
274+
let null_count = stats
275+
.column_by_name("nullCount")
276+
.unwrap()
277+
.as_any()
278+
.downcast_ref::<StructArray>()
279+
.unwrap();
280+
281+
// id has 0 nulls
282+
let id_null_count = null_count
283+
.column_by_name("id")
284+
.unwrap()
285+
.as_any()
286+
.downcast_ref::<Int64Array>()
287+
.unwrap();
288+
assert_eq!(id_null_count.value(0), 0);
289+
290+
// value has 1 null
291+
let value_null_count = null_count
292+
.column_by_name("value")
104293
.unwrap()
105294
.as_any()
106-
.downcast_ref::<BooleanArray>()
295+
.downcast_ref::<Int64Array>()
107296
.unwrap();
108-
assert!(tight_bounds.value(0));
297+
assert_eq!(value_null_count.value(0), 1);
109298
}
110299

111300
#[test]
112-
fn test_statistics_collector_multiple_batches() {
113-
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
301+
fn test_statistics_collector_multiple_batches_null_counts() {
302+
let schema = Arc::new(Schema::new(vec![Field::new("value", DataType::Utf8, true)]));
114303

115304
let batch1 = RecordBatch::try_new(
116305
schema.clone(),
117-
vec![Arc::new(Int64Array::from(vec![1, 2, 3]))],
306+
vec![Arc::new(StringArray::from(vec![Some("a"), None]))],
118307
)
119308
.unwrap();
120309

121-
let batch2 =
122-
RecordBatch::try_new(schema.clone(), vec![Arc::new(Int64Array::from(vec![4, 5]))])
123-
.unwrap();
310+
let batch2 = RecordBatch::try_new(
311+
schema.clone(),
312+
vec![Arc::new(StringArray::from(vec![None, None, Some("b")]))],
313+
)
314+
.unwrap();
124315

125-
let mut collector = StatisticsCollector::new(schema, &["id".to_string()]);
316+
let mut collector = StatisticsCollector::new(schema, &["value".to_string()]);
126317
collector.update(&batch1).unwrap();
127318
collector.update(&batch2).unwrap();
128319
let stats = collector.finalize().unwrap();
129320

130-
let num_records = stats
131-
.column_by_name("numRecords")
321+
let null_count = stats
322+
.column_by_name("nullCount")
323+
.unwrap()
324+
.as_any()
325+
.downcast_ref::<StructArray>()
326+
.unwrap();
327+
328+
let value_null_count = null_count
329+
.column_by_name("value")
132330
.unwrap()
133331
.as_any()
134332
.downcast_ref::<Int64Array>()
135333
.unwrap();
136-
assert_eq!(num_records.value(0), 5);
334+
// 1 null in batch1 + 2 nulls in batch2 = 3 total
335+
assert_eq!(value_null_count.value(0), 3);
137336
}
138337

139338
#[test]
140-
fn test_statistics_collector_empty_batch() {
141-
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
339+
fn test_statistics_collector_respects_stats_columns() {
340+
let schema = Arc::new(Schema::new(vec![
341+
Field::new("id", DataType::Int64, false),
342+
Field::new("value", DataType::Utf8, true),
343+
]));
142344

143-
let empty: Vec<i64> = vec![];
144-
let batch =
145-
RecordBatch::try_new(schema.clone(), vec![Arc::new(Int64Array::from(empty))]).unwrap();
345+
let batch = RecordBatch::try_new(
346+
schema.clone(),
347+
vec![
348+
Arc::new(Int64Array::from(vec![1, 2, 3])),
349+
Arc::new(StringArray::from(vec![Some("a"), None, Some("c")])),
350+
],
351+
)
352+
.unwrap();
146353

147-
let mut collector = StatisticsCollector::new(schema, &[]);
354+
// Only collect stats for "id", not "value"
355+
let mut collector = StatisticsCollector::new(schema, &["id".to_string()]);
148356
collector.update(&batch).unwrap();
149357
let stats = collector.finalize().unwrap();
150358

151-
let num_records = stats
152-
.column_by_name("numRecords")
359+
let null_count = stats
360+
.column_by_name("nullCount")
153361
.unwrap()
154362
.as_any()
155-
.downcast_ref::<Int64Array>()
363+
.downcast_ref::<StructArray>()
156364
.unwrap();
157-
assert_eq!(num_records.value(0), 0);
365+
366+
// Only id should be present
367+
assert!(null_count.column_by_name("id").is_some());
368+
assert!(null_count.column_by_name("value").is_none());
158369
}
159370
}

0 commit comments

Comments
 (0)