Skip to content

Commit 5c2a74d

Browse files
committed
feat: add StatisticsCollector core with numRecords
- Add StatisticsCollector struct with new(), update(), finalize() - Track numRecords across multiple RecordBatches - Output StructArray with {numRecords, tightBounds} - Basic unit tests for single/multiple batches This is the foundation for full stats collection, adding column-level stats (nullCount, minValues, maxValues) in subsequent PRs.
1 parent 2630a7a commit 5c2a74d

File tree

4 files changed

+195
-40
lines changed

4 files changed

+195
-40
lines changed

kernel/src/engine/default/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub mod file_stream;
3333
pub mod filesystem;
3434
pub mod json;
3535
pub mod parquet;
36+
pub mod stats;
3637
pub mod storage;
3738

3839
/// Converts a Stream-producing future to a synchronous iterator.

kernel/src/engine/default/parquet.rs

Lines changed: 98 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use std::sync::Arc;
77
use delta_kernel_derive::internal_api;
88

99
use crate::arrow::array::builder::{MapBuilder, MapFieldNames, StringBuilder};
10-
use crate::arrow::array::{Int64Array, RecordBatch, StringArray, StructArray};
11-
use crate::arrow::datatypes::{DataType, Field};
10+
use crate::arrow::array::{Array, Int64Array, RecordBatch, StringArray, StructArray};
11+
use crate::arrow::datatypes::Field;
1212
use crate::parquet::arrow::arrow_reader::{
1313
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
1414
};
@@ -23,6 +23,7 @@ use object_store::{DynObjectStore, ObjectStore};
2323
use uuid::Uuid;
2424

2525
use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
26+
use super::stats::collect_stats;
2627
use super::UrlExt;
2728
use crate::engine::arrow_conversion::{TryFromArrow as _, TryIntoArrow as _};
2829
use crate::engine::arrow_data::ArrowEngineData;
@@ -54,13 +55,16 @@ pub struct DataFileMetadata {
5455
file_meta: FileMeta,
5556
// NB: We use usize instead of u64 since arrow uses usize for record batch sizes
5657
num_records: usize,
58+
/// Collected statistics for this file.
59+
stats: StructArray,
5760
}
5861

5962
impl DataFileMetadata {
60-
pub fn new(file_meta: FileMeta, num_records: usize) -> Self {
63+
pub fn new(file_meta: FileMeta, num_records: usize, stats: StructArray) -> Self {
6164
Self {
6265
file_meta,
6366
num_records,
67+
stats,
6468
}
6569
}
6670

@@ -80,7 +84,8 @@ impl DataFileMetadata {
8084
last_modified,
8185
size,
8286
},
83-
num_records,
87+
stats,
88+
..
8489
} = self;
8590
// create the record batch of the write metadata
8691
let path = Arc::new(StringArray::from(vec![location.to_string()]));
@@ -104,20 +109,43 @@ impl DataFileMetadata {
104109
.map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?;
105110
let size = Arc::new(Int64Array::from(vec![size]));
106111
let modification_time = Arc::new(Int64Array::from(vec![*last_modified]));
107-
let stats = Arc::new(StructArray::try_new_with_length(
108-
vec![Field::new("numRecords", DataType::Int64, true)].into(),
109-
vec![Arc::new(Int64Array::from(vec![*num_records as i64]))],
110-
None,
111-
1,
112-
)?);
113112

114-
Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
115-
Arc::new(
116-
crate::transaction::BASE_ADD_FILES_SCHEMA
117-
.as_ref()
118-
.try_into_arrow()?,
113+
let stats_array = Arc::new(stats.clone());
114+
115+
// Build schema dynamically based on stats
116+
let stats_field = Field::new("stats", stats_array.data_type().clone(), true);
117+
let schema = crate::arrow::datatypes::Schema::new(vec![
118+
Field::new("path", crate::arrow::datatypes::DataType::Utf8, false),
119+
Field::new(
120+
"partitionValues",
121+
crate::arrow::datatypes::DataType::Map(
122+
Arc::new(Field::new(
123+
"key_value",
124+
crate::arrow::datatypes::DataType::Struct(
125+
vec![
126+
Field::new("key", crate::arrow::datatypes::DataType::Utf8, false),
127+
Field::new("value", crate::arrow::datatypes::DataType::Utf8, true),
128+
]
129+
.into(),
130+
),
131+
false,
132+
)),
133+
false,
134+
),
135+
false,
136+
),
137+
Field::new("size", crate::arrow::datatypes::DataType::Int64, false),
138+
Field::new(
139+
"modificationTime",
140+
crate::arrow::datatypes::DataType::Int64,
141+
false,
119142
),
120-
vec![path, partitions, size, modification_time, stats],
143+
stats_field,
144+
]);
145+
146+
Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
147+
Arc::new(schema),
148+
vec![path, partitions, size, modification_time, stats_array],
121149
)?)))
122150
}
123151
}
@@ -148,11 +176,15 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {
148176
&self,
149177
path: &url::Url,
150178
data: Box<dyn EngineData>,
179+
stats_columns: &[String],
151180
) -> DeltaResult<DataFileMetadata> {
152181
let batch: Box<_> = ArrowEngineData::try_from_engine_data(data)?;
153182
let record_batch = batch.record_batch();
154183
let num_records = record_batch.num_rows();
155184

185+
// Collect statistics before writing
186+
let stats = collect_stats(record_batch, stats_columns)?;
187+
156188
let mut buffer = vec![];
157189
let mut writer = ArrowWriter::try_new(&mut buffer, record_batch.schema(), None)?;
158190
writer.write(record_batch)?;
@@ -185,7 +217,7 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {
185217
}
186218

187219
let file_meta = FileMeta::new(path, modification_time, size);
188-
Ok(DataFileMetadata::new(file_meta, num_records))
220+
Ok(DataFileMetadata::new(file_meta, num_records, stats))
189221
}
190222

191223
/// Write `data` to `{path}/<uuid>.parquet` as parquet using ArrowWriter and return the parquet
@@ -201,9 +233,11 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {
201233
path: &url::Url,
202234
data: Box<dyn EngineData>,
203235
partition_values: HashMap<String, String>,
204-
_stats_columns: Option<&[String]>,
236+
stats_columns: Option<&[String]>,
205237
) -> DeltaResult<Box<dyn EngineData>> {
206-
let parquet_metadata = self.write_parquet(path, data).await?;
238+
let parquet_metadata = self
239+
.write_parquet(path, data, stats_columns.unwrap_or(&[]))
240+
.await?;
207241
parquet_metadata.as_record_batch(&partition_values)
208242
}
209243
}
@@ -609,19 +643,26 @@ mod tests {
609643
let last_modified = 10000000000;
610644
let num_records = 10;
611645
let file_metadata = FileMeta::new(location.clone(), last_modified, size);
612-
let data_file_metadata = DataFileMetadata::new(file_metadata, num_records);
646+
let stats = StructArray::try_new(
647+
vec![
648+
Field::new("numRecords", ArrowDataType::Int64, true),
649+
Field::new("tightBounds", ArrowDataType::Boolean, true),
650+
]
651+
.into(),
652+
vec![
653+
Arc::new(Int64Array::from(vec![num_records as i64])),
654+
Arc::new(BooleanArray::from(vec![true])),
655+
],
656+
None,
657+
)
658+
.unwrap();
659+
let data_file_metadata = DataFileMetadata::new(file_metadata, num_records, stats.clone());
613660
let partition_values = HashMap::from([("partition1".to_string(), "a".to_string())]);
614661
let actual = data_file_metadata
615662
.as_record_batch(&partition_values)
616663
.unwrap();
617664
let actual = ArrowEngineData::try_from_engine_data(actual).unwrap();
618665

619-
let schema = Arc::new(
620-
crate::transaction::BASE_ADD_FILES_SCHEMA
621-
.as_ref()
622-
.try_into_arrow()
623-
.unwrap(),
624-
);
625666
let mut partition_values_builder = MapBuilder::new(
626667
Some(MapFieldNames {
627668
entry: "key_value".to_string(),
@@ -635,13 +676,33 @@ mod tests {
635676
partition_values_builder.values().append_value("a");
636677
partition_values_builder.append(true).unwrap();
637678
let partition_values = partition_values_builder.finish();
638-
let stats_struct = StructArray::try_new_with_length(
639-
vec![Field::new("numRecords", ArrowDataType::Int64, true)].into(),
640-
vec![Arc::new(Int64Array::from(vec![num_records as i64]))],
641-
None,
642-
1,
643-
)
644-
.unwrap();
679+
680+
// Build expected schema dynamically based on stats
681+
let stats_field = Field::new("stats", stats.data_type().clone(), true);
682+
let schema = Arc::new(crate::arrow::datatypes::Schema::new(vec![
683+
Field::new("path", ArrowDataType::Utf8, false),
684+
Field::new(
685+
"partitionValues",
686+
ArrowDataType::Map(
687+
Arc::new(Field::new(
688+
"key_value",
689+
ArrowDataType::Struct(
690+
vec![
691+
Field::new("key", ArrowDataType::Utf8, false),
692+
Field::new("value", ArrowDataType::Utf8, true),
693+
]
694+
.into(),
695+
),
696+
false,
697+
)),
698+
false,
699+
),
700+
false,
701+
),
702+
Field::new("size", ArrowDataType::Int64, false),
703+
Field::new("modificationTime", ArrowDataType::Int64, false),
704+
stats_field,
705+
]));
645706

646707
let expected = RecordBatch::try_new(
647708
schema,
@@ -650,7 +711,7 @@ mod tests {
650711
Arc::new(partition_values),
651712
Arc::new(Int64Array::from(vec![size as i64])),
652713
Arc::new(Int64Array::from(vec![last_modified])),
653-
Arc::new(stats_struct),
714+
Arc::new(stats),
654715
],
655716
)
656717
.unwrap();
@@ -673,7 +734,7 @@ mod tests {
673734
));
674735

675736
let write_metadata = parquet_handler
676-
.write_parquet(&Url::parse("memory:///data/").unwrap(), data)
737+
.write_parquet(&Url::parse("memory:///data/").unwrap(), data, &[])
677738
.await
678739
.unwrap();
679740

@@ -685,6 +746,7 @@ mod tests {
685746
size,
686747
},
687748
num_records,
749+
..
688750
} = write_metadata;
689751
let expected_location = Url::parse("memory:///data/").unwrap();
690752

@@ -744,7 +806,7 @@ mod tests {
744806

745807
assert_result_error_with_message(
746808
parquet_handler
747-
.write_parquet(&Url::parse("memory:///data").unwrap(), data)
809+
.write_parquet(&Url::parse("memory:///data").unwrap(), data, &[])
748810
.await,
749811
"Generic delta kernel error: Path must end with a trailing slash: memory:///data",
750812
);

kernel/src/engine/default/stats.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
//! Statistics collection for Delta Lake file writes.
2+
//!
3+
//! Provides `collect_stats` to compute statistics for a RecordBatch during file writes.
4+
5+
use std::sync::Arc;
6+
7+
use crate::arrow::array::{Array, BooleanArray, Int64Array, RecordBatch, StructArray};
8+
use crate::arrow::datatypes::{DataType, Field};
9+
use crate::{DeltaResult, Error};
10+
11+
/// Collect statistics from a RecordBatch for Delta Lake file statistics.
12+
///
13+
/// Returns a StructArray with the following fields:
14+
/// - `numRecords`: total row count
15+
/// - `tightBounds`: always true for new file writes
16+
///
17+
/// # Arguments
18+
/// * `batch` - The RecordBatch to collect statistics from
19+
/// * `_stats_columns` - Column names that should have statistics collected (reserved for future use)
20+
pub(crate) fn collect_stats(
21+
batch: &RecordBatch,
22+
_stats_columns: &[String],
23+
) -> DeltaResult<StructArray> {
24+
let mut fields = Vec::new();
25+
let mut arrays: Vec<Arc<dyn Array>> = Vec::new();
26+
27+
// numRecords
28+
fields.push(Field::new("numRecords", DataType::Int64, true));
29+
arrays.push(Arc::new(Int64Array::from(vec![batch.num_rows() as i64])));
30+
31+
// tightBounds - always true for new file writes
32+
fields.push(Field::new("tightBounds", DataType::Boolean, true));
33+
arrays.push(Arc::new(BooleanArray::from(vec![true])));
34+
35+
StructArray::try_new(fields.into(), arrays, None)
36+
.map_err(|e| Error::generic(format!("Failed to create stats struct: {e}")))
37+
}
38+
39+
#[cfg(test)]
40+
mod tests {
41+
use super::*;
42+
use crate::arrow::array::Int64Array;
43+
use crate::arrow::datatypes::Schema;
44+
45+
#[test]
46+
fn test_collect_stats_basic() {
47+
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
48+
49+
let batch =
50+
RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1, 2, 3]))]).unwrap();
51+
52+
let stats = collect_stats(&batch, &["id".to_string()]).unwrap();
53+
54+
assert_eq!(stats.len(), 1);
55+
56+
// Check numRecords
57+
let num_records = stats
58+
.column_by_name("numRecords")
59+
.unwrap()
60+
.as_any()
61+
.downcast_ref::<Int64Array>()
62+
.unwrap();
63+
assert_eq!(num_records.value(0), 3);
64+
65+
// Check tightBounds
66+
let tight_bounds = stats
67+
.column_by_name("tightBounds")
68+
.unwrap()
69+
.as_any()
70+
.downcast_ref::<BooleanArray>()
71+
.unwrap();
72+
assert!(tight_bounds.value(0));
73+
}
74+
75+
#[test]
76+
fn test_collect_stats_empty_batch() {
77+
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
78+
79+
let empty: Vec<i64> = vec![];
80+
let batch = RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(empty))]).unwrap();
81+
82+
let stats = collect_stats(&batch, &[]).unwrap();
83+
84+
let num_records = stats
85+
.column_by_name("numRecords")
86+
.unwrap()
87+
.as_any()
88+
.downcast_ref::<Int64Array>()
89+
.unwrap();
90+
assert_eq!(num_records.value(0), 0);
91+
}
92+
}

kernel/tests/write.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ async fn test_append() -> Result<(), Box<dyn std::error::Error>> {
393393
"size": size,
394394
"modificationTime": 0,
395395
"dataChange": true,
396-
"stats": "{\"numRecords\":3}"
396+
"stats": "{\"numRecords\":3,\"tightBounds\":true}"
397397
}
398398
}),
399399
json!({
@@ -403,7 +403,7 @@ async fn test_append() -> Result<(), Box<dyn std::error::Error>> {
403403
"size": size,
404404
"modificationTime": 0,
405405
"dataChange": true,
406-
"stats": "{\"numRecords\":3}"
406+
"stats": "{\"numRecords\":3,\"tightBounds\":true}"
407407
}
408408
}),
409409
];
@@ -601,7 +601,7 @@ async fn test_append_partitioned() -> Result<(), Box<dyn std::error::Error>> {
601601
"size": size,
602602
"modificationTime": 0,
603603
"dataChange": false,
604-
"stats": "{\"numRecords\":3}"
604+
"stats": "{\"numRecords\":3,\"tightBounds\":true}"
605605
}
606606
}),
607607
json!({
@@ -613,7 +613,7 @@ async fn test_append_partitioned() -> Result<(), Box<dyn std::error::Error>> {
613613
"size": size,
614614
"modificationTime": 0,
615615
"dataChange": false,
616-
"stats": "{\"numRecords\":3}"
616+
"stats": "{\"numRecords\":3,\"tightBounds\":true}"
617617
}
618618
}),
619619
];

0 commit comments

Comments
 (0)