Skip to content

Commit d72d2fc

Browse files
committed
nullcount
1 parent e361bff commit d72d2fc

File tree

2 files changed

+181
-9
lines changed

2 files changed

+181
-9
lines changed

kernel/src/engine/default/stats.rs

Lines changed: 177 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,110 @@
22
//!
33
//! Provides `collect_stats` to compute statistics for a RecordBatch during file writes.
44
5+
use std::collections::HashSet;
56
use std::sync::Arc;
67

7-
use crate::arrow::array::{Array, BooleanArray, Int64Array, RecordBatch, StructArray};
8-
use crate::arrow::datatypes::{DataType, Field};
8+
use crate::arrow::array::{Array, ArrayRef, BooleanArray, Int64Array, RecordBatch, StructArray};
9+
use crate::arrow::datatypes::{DataType, Field, Fields};
910
use crate::{DeltaResult, Error};
1011

12+
/// Compute null count for a column, handling nested structs.
13+
fn compute_null_count(column: &ArrayRef) -> DeltaResult<ArrayRef> {
14+
match column.data_type() {
15+
DataType::Struct(fields) => {
16+
let struct_array = column
17+
.as_any()
18+
.downcast_ref::<StructArray>()
19+
.ok_or_else(|| Error::generic("Expected StructArray for struct column"))?;
20+
let children: Vec<ArrayRef> = (0..fields.len())
21+
.map(|i| compute_null_count(struct_array.column(i)))
22+
.collect::<DeltaResult<Vec<_>>>()?;
23+
let null_count_fields: Fields = fields
24+
.iter()
25+
.map(|f| Field::new(f.name(), null_count_data_type(f.data_type()), true))
26+
.collect();
27+
Ok(Arc::new(
28+
StructArray::try_new(null_count_fields, children, None)
29+
.map_err(|e| Error::generic(format!("null count struct: {e}")))?,
30+
))
31+
}
32+
_ => Ok(Arc::new(Int64Array::from(vec![column.null_count() as i64]))),
33+
}
34+
}
35+
36+
/// Get the data type for null counts of a given data type.
37+
fn null_count_data_type(data_type: &DataType) -> DataType {
38+
match data_type {
39+
DataType::Struct(fields) => {
40+
let null_count_fields: Vec<Field> = fields
41+
.iter()
42+
.map(|f| Field::new(f.name(), null_count_data_type(f.data_type()), true))
43+
.collect();
44+
DataType::Struct(null_count_fields.into())
45+
}
46+
_ => DataType::Int64,
47+
}
48+
}
49+
1150
/// Collect statistics from a RecordBatch for Delta Lake file statistics.
1251
///
1352
/// Returns a StructArray with the following fields:
1453
/// - `numRecords`: total row count
54+
/// - `nullCount`: nested struct with null counts per column (if stats_columns is non-empty)
1555
/// - `tightBounds`: always true for new file writes
1656
///
1757
/// # Arguments
1858
/// * `batch` - The RecordBatch to collect statistics from
19-
/// * `_stats_columns` - Column names that should have statistics collected (reserved for future use)
59+
/// * `stats_columns` - Column names that should have statistics collected.
60+
/// Only these columns will appear in nullCount.
2061
pub(crate) fn collect_stats(
2162
batch: &RecordBatch,
22-
_stats_columns: &[String],
63+
stats_columns: &[String],
2364
) -> DeltaResult<StructArray> {
65+
let stats_set: HashSet<&str> = stats_columns.iter().map(|s| s.as_str()).collect();
66+
let schema = batch.schema();
67+
2468
let mut fields = Vec::new();
2569
let mut arrays: Vec<Arc<dyn Array>> = Vec::new();
2670

2771
// numRecords
2872
fields.push(Field::new("numRecords", DataType::Int64, true));
2973
arrays.push(Arc::new(Int64Array::from(vec![batch.num_rows() as i64])));
3074

75+
// Collect null counts for each column that's in stats_columns
76+
let mut null_count_fields = Vec::new();
77+
let mut null_count_arrays = Vec::new();
78+
79+
for (col_idx, field) in schema.fields().iter().enumerate() {
80+
if !stats_set.contains(field.name().as_str()) {
81+
continue;
82+
}
83+
84+
let column = batch.column(col_idx);
85+
86+
// Null count
87+
let null_count = compute_null_count(column)?;
88+
null_count_fields.push(Field::new(
89+
field.name(),
90+
null_count.data_type().clone(),
91+
true,
92+
));
93+
null_count_arrays.push(null_count);
94+
}
95+
96+
// nullCount struct
97+
if !null_count_fields.is_empty() {
98+
let null_count_struct =
99+
StructArray::try_new(null_count_fields.into(), null_count_arrays, None)
100+
.map_err(|e| Error::generic(format!("Failed to create nullCount: {e}")))?;
101+
fields.push(Field::new(
102+
"nullCount",
103+
null_count_struct.data_type().clone(),
104+
true,
105+
));
106+
arrays.push(Arc::new(null_count_struct));
107+
}
108+
31109
// tightBounds - always true for new file writes
32110
fields.push(Field::new("tightBounds", DataType::Boolean, true));
33111
arrays.push(Arc::new(BooleanArray::from(vec![true])));
@@ -39,7 +117,7 @@ pub(crate) fn collect_stats(
39117
#[cfg(test)]
40118
mod tests {
41119
use super::*;
42-
use crate::arrow::array::Int64Array;
120+
use crate::arrow::array::{Int64Array, StringArray};
43121
use crate::arrow::datatypes::Schema;
44122

45123
#[test]
@@ -89,4 +167,98 @@ mod tests {
89167
.unwrap();
90168
assert_eq!(num_records.value(0), 0);
91169
}
170+
171+
#[test]
172+
fn test_collect_stats_null_counts() {
173+
let schema = Arc::new(Schema::new(vec![
174+
Field::new("id", DataType::Int64, false),
175+
Field::new("value", DataType::Utf8, true),
176+
]));
177+
178+
let batch = RecordBatch::try_new(
179+
schema,
180+
vec![
181+
Arc::new(Int64Array::from(vec![1, 2, 3])),
182+
Arc::new(StringArray::from(vec![Some("a"), None, Some("c")])),
183+
],
184+
)
185+
.unwrap();
186+
187+
let stats = collect_stats(&batch, &["id".to_string(), "value".to_string()]).unwrap();
188+
189+
// Check nullCount struct
190+
let null_count = stats
191+
.column_by_name("nullCount")
192+
.unwrap()
193+
.as_any()
194+
.downcast_ref::<StructArray>()
195+
.unwrap();
196+
197+
// id has 0 nulls
198+
let id_null_count = null_count
199+
.column_by_name("id")
200+
.unwrap()
201+
.as_any()
202+
.downcast_ref::<Int64Array>()
203+
.unwrap();
204+
assert_eq!(id_null_count.value(0), 0);
205+
206+
// value has 1 null
207+
let value_null_count = null_count
208+
.column_by_name("value")
209+
.unwrap()
210+
.as_any()
211+
.downcast_ref::<Int64Array>()
212+
.unwrap();
213+
assert_eq!(value_null_count.value(0), 1);
214+
}
215+
216+
#[test]
217+
fn test_collect_stats_respects_stats_columns() {
218+
let schema = Arc::new(Schema::new(vec![
219+
Field::new("id", DataType::Int64, false),
220+
Field::new("value", DataType::Utf8, true),
221+
]));
222+
223+
let batch = RecordBatch::try_new(
224+
schema,
225+
vec![
226+
Arc::new(Int64Array::from(vec![1, 2, 3])),
227+
Arc::new(StringArray::from(vec![Some("a"), None, Some("c")])),
228+
],
229+
)
230+
.unwrap();
231+
232+
// Only collect stats for "id", not "value"
233+
let stats = collect_stats(&batch, &["id".to_string()]).unwrap();
234+
235+
let null_count = stats
236+
.column_by_name("nullCount")
237+
.unwrap()
238+
.as_any()
239+
.downcast_ref::<StructArray>()
240+
.unwrap();
241+
242+
// Only id should be present
243+
assert!(null_count.column_by_name("id").is_some());
244+
assert!(null_count.column_by_name("value").is_none());
245+
}
246+
247+
#[test]
248+
fn test_collect_stats_empty_stats_columns() {
249+
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
250+
251+
let batch =
252+
RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1, 2, 3]))]).unwrap();
253+
254+
// No stats columns requested
255+
let stats = collect_stats(&batch, &[]).unwrap();
256+
257+
// Should still have numRecords and tightBounds
258+
assert!(stats.column_by_name("numRecords").is_some());
259+
assert!(stats.column_by_name("tightBounds").is_some());
260+
261+
// Should not have nullCount
262+
assert!(stats.column_by_name("nullCount").is_none());
263+
}
92264
}

kernel/tests/write.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ async fn test_append() -> Result<(), Box<dyn std::error::Error>> {
393393
"size": size,
394394
"modificationTime": 0,
395395
"dataChange": true,
396-
"stats": "{\"numRecords\":3,\"tightBounds\":true}"
396+
"stats": "{\"numRecords\":3,\"nullCount\":{\"number\":0},\"tightBounds\":true}"
397397
}
398398
}),
399399
json!({
@@ -403,7 +403,7 @@ async fn test_append() -> Result<(), Box<dyn std::error::Error>> {
403403
"size": size,
404404
"modificationTime": 0,
405405
"dataChange": true,
406-
"stats": "{\"numRecords\":3,\"tightBounds\":true}"
406+
"stats": "{\"numRecords\":3,\"nullCount\":{\"number\":0},\"tightBounds\":true}"
407407
}
408408
}),
409409
];
@@ -601,7 +601,7 @@ async fn test_append_partitioned() -> Result<(), Box<dyn std::error::Error>> {
601601
"size": size,
602602
"modificationTime": 0,
603603
"dataChange": false,
604-
"stats": "{\"numRecords\":3,\"tightBounds\":true}"
604+
"stats": "{\"numRecords\":3,\"nullCount\":{\"number\":0},\"tightBounds\":true}"
605605
}
606606
}),
607607
json!({
@@ -613,7 +613,7 @@ async fn test_append_partitioned() -> Result<(), Box<dyn std::error::Error>> {
613613
"size": size,
614614
"modificationTime": 0,
615615
"dataChange": false,
616-
"stats": "{\"numRecords\":3,\"tightBounds\":true}"
616+
"stats": "{\"numRecords\":3,\"nullCount\":{\"number\":0},\"tightBounds\":true}"
617617
}
618618
}),
619619
];

0 commit comments

Comments
 (0)