33//! This module provides `StatisticsCollector` which accumulates statistics
44//! across multiple Arrow RecordBatches during file writes.
55
6+ use std:: collections:: HashSet ;
67use std:: sync:: Arc ;
78
8- use crate :: arrow:: array:: { Array , BooleanArray , Int64Array , RecordBatch , StructArray } ;
9- use crate :: arrow:: datatypes:: { DataType , Field } ;
9+ use crate :: arrow:: array:: { Array , ArrayRef , BooleanArray , Int64Array , RecordBatch , StructArray } ;
10+ use crate :: arrow:: datatypes:: { DataType , Field , Fields } ;
1011use crate :: { DeltaResult , Error } ;
1112
1213/// Collects statistics from RecordBatches for Delta Lake file statistics.
1314/// Supports streaming accumulation across multiple batches.
1415pub ( crate ) struct StatisticsCollector {
1516 /// Total number of records across all batches.
1617 num_records : i64 ,
18+ /// Column names from the data schema.
19+ column_names : Vec < String > ,
1720 /// Column names that should have stats collected.
18- #[ allow( dead_code) ]
19- stats_columns : Vec < String > ,
21+ stats_columns : HashSet < String > ,
22+ /// Null counts per column. For structs, this is a StructArray with nested Int64Arrays.
23+ null_counts : Vec < ArrayRef > ,
2024}
2125
2226impl StatisticsCollector {
@@ -26,28 +30,146 @@ impl StatisticsCollector {
2630 /// * `data_schema` - The Arrow schema of the data being written
2731 /// * `stats_columns` - Column names that should have statistics collected
2832 pub ( crate ) fn new (
29- _data_schema : Arc < crate :: arrow:: datatypes:: Schema > ,
33+ data_schema : Arc < crate :: arrow:: datatypes:: Schema > ,
3034 stats_columns : & [ String ] ,
3135 ) -> Self {
36+ let stats_set: HashSet < String > = stats_columns. iter ( ) . cloned ( ) . collect ( ) ;
37+
38+ let mut column_names = Vec :: with_capacity ( data_schema. fields ( ) . len ( ) ) ;
39+ let mut null_counts = Vec :: with_capacity ( data_schema. fields ( ) . len ( ) ) ;
40+
41+ for field in data_schema. fields ( ) {
42+ column_names. push ( field. name ( ) . clone ( ) ) ;
43+ null_counts. push ( Self :: create_zero_null_count ( field. data_type ( ) ) ) ;
44+ }
45+
3246 Self {
3347 num_records : 0 ,
34- stats_columns : stats_columns. to_vec ( ) ,
48+ column_names,
49+ stats_columns : stats_set,
50+ null_counts,
51+ }
52+ }
53+
54+ /// Check if a column should have stats collected.
55+ fn should_collect_stats ( & self , column_name : & str ) -> bool {
56+ self . stats_columns . contains ( column_name)
57+ }
58+
59+ /// Create a zero-initialized null count structure for the given data type.
60+ fn create_zero_null_count ( data_type : & DataType ) -> ArrayRef {
61+ match data_type {
62+ DataType :: Struct ( fields) => {
63+ let children: Vec < ArrayRef > = fields
64+ . iter ( )
65+ . map ( |f| Self :: create_zero_null_count ( f. data_type ( ) ) )
66+ . collect ( ) ;
67+ let null_count_fields: Fields = fields
68+ . iter ( )
69+ . map ( |f| {
70+ let child_type = Self :: null_count_data_type ( f. data_type ( ) ) ;
71+ Field :: new ( f. name ( ) , child_type, true )
72+ } )
73+ . collect ( ) ;
74+ Arc :: new (
75+ StructArray :: try_new ( null_count_fields, children, None )
76+ . expect ( "Failed to create null count struct" ) ,
77+ )
78+ }
79+ _ => Arc :: new ( Int64Array :: from ( vec ! [ 0i64 ] ) ) ,
80+ }
81+ }
82+
83+ /// Get the data type for null counts of a given data type.
84+ fn null_count_data_type ( data_type : & DataType ) -> DataType {
85+ match data_type {
86+ DataType :: Struct ( fields) => {
87+ let null_count_fields: Vec < Field > = fields
88+ . iter ( )
89+ . map ( |f| Field :: new ( f. name ( ) , Self :: null_count_data_type ( f. data_type ( ) ) , true ) )
90+ . collect ( ) ;
91+ DataType :: Struct ( null_count_fields. into ( ) )
92+ }
93+ _ => DataType :: Int64 ,
94+ }
95+ }
96+
97+ /// Compute null counts for a column.
98+ fn compute_null_counts ( column : & ArrayRef ) -> ArrayRef {
99+ match column. data_type ( ) {
100+ DataType :: Struct ( fields) => {
101+ let struct_array = column. as_any ( ) . downcast_ref :: < StructArray > ( ) . unwrap ( ) ;
102+ let children: Vec < ArrayRef > = ( 0 ..fields. len ( ) )
103+ . map ( |i| Self :: compute_null_counts ( struct_array. column ( i) ) )
104+ . collect ( ) ;
105+ let null_count_fields: Fields = fields
106+ . iter ( )
107+ . map ( |f| Field :: new ( f. name ( ) , Self :: null_count_data_type ( f. data_type ( ) ) , true ) )
108+ . collect ( ) ;
109+ Arc :: new (
110+ StructArray :: try_new ( null_count_fields, children, None )
111+ . expect ( "Failed to create null count struct" ) ,
112+ )
113+ }
114+ _ => {
115+ let null_count = column. null_count ( ) as i64 ;
116+ Arc :: new ( Int64Array :: from ( vec ! [ null_count] ) )
117+ }
118+ }
119+ }
120+
121+ /// Merge two null count structures by adding them together.
122+ fn merge_null_counts ( existing : & ArrayRef , new : & ArrayRef ) -> ArrayRef {
123+ match existing. data_type ( ) {
124+ DataType :: Struct ( fields) => {
125+ let existing_struct = existing. as_any ( ) . downcast_ref :: < StructArray > ( ) . unwrap ( ) ;
126+ let new_struct = new. as_any ( ) . downcast_ref :: < StructArray > ( ) . unwrap ( ) ;
127+
128+ let children: Vec < ArrayRef > = ( 0 ..fields. len ( ) )
129+ . map ( |i| {
130+ Self :: merge_null_counts ( existing_struct. column ( i) , new_struct. column ( i) )
131+ } )
132+ . collect ( ) ;
133+
134+ let null_count_fields: Fields = fields
135+ . iter ( )
136+ . map ( |f| Field :: new ( f. name ( ) , Self :: null_count_data_type ( f. data_type ( ) ) , true ) )
137+ . collect ( ) ;
138+ Arc :: new (
139+ StructArray :: try_new ( null_count_fields, children, None )
140+ . expect ( "Failed to merge null count struct" ) ,
141+ )
142+ }
143+ _ => {
144+ let existing_val = existing
145+ . as_any ( )
146+ . downcast_ref :: < Int64Array > ( )
147+ . unwrap ( )
148+ . value ( 0 ) ;
149+ let new_val = new. as_any ( ) . downcast_ref :: < Int64Array > ( ) . unwrap ( ) . value ( 0 ) ;
150+ Arc :: new ( Int64Array :: from ( vec ! [ existing_val + new_val] ) )
151+ }
35152 }
36153 }
37154
38155 /// Update statistics with data from a RecordBatch.
39- ///
40- /// This method accumulates statistics across multiple batches.
41156 pub ( crate ) fn update ( & mut self , batch : & RecordBatch ) -> DeltaResult < ( ) > {
42157 self . num_records += batch. num_rows ( ) as i64 ;
158+
159+ // Update null counts
160+ for ( col_idx, column) in batch. columns ( ) . iter ( ) . enumerate ( ) {
161+ let col_name = & self . column_names [ col_idx] ;
162+ if self . should_collect_stats ( col_name) {
163+ let batch_null_counts = Self :: compute_null_counts ( column) ;
164+ self . null_counts [ col_idx] =
165+ Self :: merge_null_counts ( & self . null_counts [ col_idx] , & batch_null_counts) ;
166+ }
167+ }
168+
43169 Ok ( ( ) )
44170 }
45171
46172 /// Finalize and return the collected statistics as a StructArray.
47- ///
48- /// Returns a single-row StructArray with the Delta Lake stats schema:
49- /// - numRecords: total row count
50- /// - tightBounds: true for new files (no deletion vectors applied)
51173 pub ( crate ) fn finalize ( & self ) -> DeltaResult < StructArray > {
52174 let mut fields = Vec :: new ( ) ;
53175 let mut arrays: Vec < Arc < dyn Array > > = Vec :: new ( ) ;
@@ -56,8 +178,37 @@ impl StatisticsCollector {
56178 fields. push ( Field :: new ( "numRecords" , DataType :: Int64 , true ) ) ;
57179 arrays. push ( Arc :: new ( Int64Array :: from ( vec ! [ self . num_records] ) ) ) ;
58180
59- // tightBounds - always true for new file writes
60- // (false only when deletion vectors are applied to existing files)
181+ // nullCount - nested struct matching data schema
182+ let null_count_fields: Vec < Field > = self
183+ . column_names
184+ . iter ( )
185+ . enumerate ( )
186+ . filter ( |( _, name) | self . should_collect_stats ( name) )
187+ . map ( |( idx, name) | Field :: new ( name, self . null_counts [ idx] . data_type ( ) . clone ( ) , true ) )
188+ . collect ( ) ;
189+
190+ if !null_count_fields. is_empty ( ) {
191+ let null_count_arrays: Vec < ArrayRef > = self
192+ . column_names
193+ . iter ( )
194+ . enumerate ( )
195+ . filter ( |( _, name) | self . should_collect_stats ( name) )
196+ . map ( |( idx, _) | self . null_counts [ idx] . clone ( ) )
197+ . collect ( ) ;
198+
199+ let null_count_struct =
200+ StructArray :: try_new ( null_count_fields. into ( ) , null_count_arrays, None )
201+ . map_err ( |e| Error :: generic ( format ! ( "Failed to create nullCount: {e}" ) ) ) ?;
202+
203+ fields. push ( Field :: new (
204+ "nullCount" ,
205+ null_count_struct. data_type ( ) . clone ( ) ,
206+ true ,
207+ ) ) ;
208+ arrays. push ( Arc :: new ( null_count_struct) ) ;
209+ }
210+
211+ // tightBounds
61212 fields. push ( Field :: new ( "tightBounds" , DataType :: Boolean , true ) ) ;
62213 arrays. push ( Arc :: new ( BooleanArray :: from ( vec ! [ true ] ) ) ) ;
63214
@@ -69,7 +220,7 @@ impl StatisticsCollector {
69220#[ cfg( test) ]
70221mod tests {
71222 use super :: * ;
72- use crate :: arrow:: array:: { Array , Int64Array } ;
223+ use crate :: arrow:: array:: { Array , Int64Array , StringArray } ;
73224 use crate :: arrow:: datatypes:: Schema ;
74225
75226 #[ test]
@@ -86,7 +237,6 @@ mod tests {
86237 collector. update ( & batch) . unwrap ( ) ;
87238 let stats = collector. finalize ( ) . unwrap ( ) ;
88239
89- // Check numRecords
90240 assert_eq ! ( stats. len( ) , 1 ) ;
91241 let num_records = stats
92242 . column_by_name ( "numRecords" )
@@ -95,63 +245,124 @@ mod tests {
95245 . downcast_ref :: < Int64Array > ( )
96246 . unwrap ( ) ;
97247 assert_eq ! ( num_records. value( 0 ) , 3 ) ;
248+ }
249+
250+ #[ test]
251+ fn test_statistics_collector_null_counts ( ) {
252+ let schema = Arc :: new ( Schema :: new ( vec ! [
253+ Field :: new( "id" , DataType :: Int64 , false ) ,
254+ Field :: new( "value" , DataType :: Utf8 , true ) ,
255+ ] ) ) ;
98256
99- // Check tightBounds
100- let tight_bounds = stats
101- . column_by_name ( "tightBounds" )
257+ let batch = RecordBatch :: try_new (
258+ schema. clone ( ) ,
259+ vec ! [
260+ Arc :: new( Int64Array :: from( vec![ 1 , 2 , 3 ] ) ) ,
261+ Arc :: new( StringArray :: from( vec![ Some ( "a" ) , None , Some ( "c" ) ] ) ) ,
262+ ] ,
263+ )
264+ . unwrap ( ) ;
265+
266+ let mut collector =
267+ StatisticsCollector :: new ( schema, & [ "id" . to_string ( ) , "value" . to_string ( ) ] ) ;
268+ collector. update ( & batch) . unwrap ( ) ;
269+ let stats = collector. finalize ( ) . unwrap ( ) ;
270+
271+ // Check nullCount struct
272+ let null_count = stats
273+ . column_by_name ( "nullCount" )
274+ . unwrap ( )
275+ . as_any ( )
276+ . downcast_ref :: < StructArray > ( )
277+ . unwrap ( ) ;
278+
279+ // id has 0 nulls
280+ let id_null_count = null_count
281+ . column_by_name ( "id" )
282+ . unwrap ( )
283+ . as_any ( )
284+ . downcast_ref :: < Int64Array > ( )
285+ . unwrap ( ) ;
286+ assert_eq ! ( id_null_count. value( 0 ) , 0 ) ;
287+
288+ // value has 1 null
289+ let value_null_count = null_count
290+ . column_by_name ( "value" )
102291 . unwrap ( )
103292 . as_any ( )
104- . downcast_ref :: < BooleanArray > ( )
293+ . downcast_ref :: < Int64Array > ( )
105294 . unwrap ( ) ;
106- assert ! ( tight_bounds . value( 0 ) ) ;
295+ assert_eq ! ( value_null_count . value( 0 ) , 1 ) ;
107296 }
108297
109298 #[ test]
110- fn test_statistics_collector_multiple_batches ( ) {
111- let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new( "id " , DataType :: Int64 , false ) ] ) ) ;
299+ fn test_statistics_collector_multiple_batches_null_counts ( ) {
300+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new( "value " , DataType :: Utf8 , true ) ] ) ) ;
112301
113302 let batch1 = RecordBatch :: try_new (
114303 schema. clone ( ) ,
115- vec ! [ Arc :: new( Int64Array :: from( vec![ 1 , 2 , 3 ] ) ) ] ,
304+ vec ! [ Arc :: new( StringArray :: from( vec![ Some ( "a" ) , None ] ) ) ] ,
116305 )
117306 . unwrap ( ) ;
118307
119- let batch2 =
120- RecordBatch :: try_new ( schema. clone ( ) , vec ! [ Arc :: new( Int64Array :: from( vec![ 4 , 5 ] ) ) ] )
121- . unwrap ( ) ;
308+ let batch2 = RecordBatch :: try_new (
309+ schema. clone ( ) ,
310+ vec ! [ Arc :: new( StringArray :: from( vec![ None , None , Some ( "b" ) ] ) ) ] ,
311+ )
312+ . unwrap ( ) ;
122313
123- let mut collector = StatisticsCollector :: new ( schema, & [ "id " . to_string ( ) ] ) ;
314+ let mut collector = StatisticsCollector :: new ( schema, & [ "value " . to_string ( ) ] ) ;
124315 collector. update ( & batch1) . unwrap ( ) ;
125316 collector. update ( & batch2) . unwrap ( ) ;
126317 let stats = collector. finalize ( ) . unwrap ( ) ;
127318
128- let num_records = stats
129- . column_by_name ( "numRecords" )
319+ let null_count = stats
320+ . column_by_name ( "nullCount" )
321+ . unwrap ( )
322+ . as_any ( )
323+ . downcast_ref :: < StructArray > ( )
324+ . unwrap ( ) ;
325+
326+ let value_null_count = null_count
327+ . column_by_name ( "value" )
130328 . unwrap ( )
131329 . as_any ( )
132330 . downcast_ref :: < Int64Array > ( )
133331 . unwrap ( ) ;
134- assert_eq ! ( num_records. value( 0 ) , 5 ) ;
332+ // 1 null in batch1 + 2 nulls in batch2 = 3 total
333+ assert_eq ! ( value_null_count. value( 0 ) , 3 ) ;
135334 }
136335
137336 #[ test]
138- fn test_statistics_collector_empty_batch ( ) {
139- let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new( "id" , DataType :: Int64 , false ) ] ) ) ;
337+ fn test_statistics_collector_respects_stats_columns ( ) {
338+ let schema = Arc :: new ( Schema :: new ( vec ! [
339+ Field :: new( "id" , DataType :: Int64 , false ) ,
340+ Field :: new( "value" , DataType :: Utf8 , true ) ,
341+ ] ) ) ;
140342
141- let empty: Vec < i64 > = vec ! [ ] ;
142- let batch =
143- RecordBatch :: try_new ( schema. clone ( ) , vec ! [ Arc :: new( Int64Array :: from( empty) ) ] ) . unwrap ( ) ;
343+ let batch = RecordBatch :: try_new (
344+ schema. clone ( ) ,
345+ vec ! [
346+ Arc :: new( Int64Array :: from( vec![ 1 , 2 , 3 ] ) ) ,
347+ Arc :: new( StringArray :: from( vec![ Some ( "a" ) , None , Some ( "c" ) ] ) ) ,
348+ ] ,
349+ )
350+ . unwrap ( ) ;
144351
145- let mut collector = StatisticsCollector :: new ( schema, & [ ] ) ;
352+ // Only collect stats for "id", not "value"
353+ let mut collector = StatisticsCollector :: new ( schema, & [ "id" . to_string ( ) ] ) ;
146354 collector. update ( & batch) . unwrap ( ) ;
147355 let stats = collector. finalize ( ) . unwrap ( ) ;
148356
149- let num_records = stats
150- . column_by_name ( "numRecords " )
357+ let null_count = stats
358+ . column_by_name ( "nullCount " )
151359 . unwrap ( )
152360 . as_any ( )
153- . downcast_ref :: < Int64Array > ( )
361+ . downcast_ref :: < StructArray > ( )
154362 . unwrap ( ) ;
155- assert_eq ! ( num_records. value( 0 ) , 0 ) ;
363+
364+ // Only id should be present
365+ assert ! ( null_count. column_by_name( "id" ) . is_some( ) ) ;
366+ assert ! ( null_count. column_by_name( "value" ) . is_none( ) ) ;
156367 }
157368}
0 commit comments