Skip to content

Commit e361bff

Browse files
committed
feat: add StatisticsCollector core with numRecords
- Add StatisticsCollector struct with new(), update(), finalize() - Track numRecords across multiple RecordBatches - Output StructArray with {numRecords, tightBounds} - Basic unit tests for single/multiple batches This is the foundation for full stats collection, adding column-level stats (nullCount, minValues, maxValues) in subsequent PRs.
1 parent 2630a7a commit e361bff

File tree

4 files changed

+196
-51
lines changed

4 files changed

+196
-51
lines changed

kernel/src/engine/default/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub mod file_stream;
3333
pub mod filesystem;
3434
pub mod json;
3535
pub mod parquet;
36+
pub mod stats;
3637
pub mod storage;
3738

3839
/// Converts a Stream-producing future to a synchronous iterator.

kernel/src/engine/default/parquet.rs

Lines changed: 99 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use std::sync::Arc;
77
use delta_kernel_derive::internal_api;
88

99
use crate::arrow::array::builder::{MapBuilder, MapFieldNames, StringBuilder};
10-
use crate::arrow::array::{Int64Array, RecordBatch, StringArray, StructArray};
11-
use crate::arrow::datatypes::{DataType, Field};
10+
use crate::arrow::array::{Array, Int64Array, RecordBatch, StringArray, StructArray};
11+
use crate::arrow::datatypes::{DataType, Field, Schema};
1212
use crate::parquet::arrow::arrow_reader::{
1313
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
1414
};
@@ -23,6 +23,7 @@ use object_store::{DynObjectStore, ObjectStore};
2323
use uuid::Uuid;
2424

2525
use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
26+
use super::stats::collect_stats;
2627
use super::UrlExt;
2728
use crate::engine::arrow_conversion::{TryFromArrow as _, TryIntoArrow as _};
2829
use crate::engine::arrow_data::ArrowEngineData;
@@ -46,22 +47,16 @@ pub struct DefaultParquetHandler<E: TaskExecutor> {
4647
}
4748

4849
/// Metadata of a data file (typically a parquet file).
49-
///
50-
/// Currently just includes the the number of records as statistics, but will expand to include
51-
/// more statistics and other metadata in the future.
5250
#[derive(Debug)]
5351
pub struct DataFileMetadata {
5452
file_meta: FileMeta,
55-
// NB: We use usize instead of u64 since arrow uses usize for record batch sizes
56-
num_records: usize,
53+
/// Collected statistics for this file (includes numRecords, tightBounds, etc.).
54+
stats: StructArray,
5755
}
5856

5957
impl DataFileMetadata {
60-
pub fn new(file_meta: FileMeta, num_records: usize) -> Self {
61-
Self {
62-
file_meta,
63-
num_records,
64-
}
58+
pub fn new(file_meta: FileMeta, stats: StructArray) -> Self {
59+
Self { file_meta, stats }
6560
}
6661

6762
/// Convert DataFileMetadata into a record batch which matches the schema returned by
@@ -80,7 +75,8 @@ impl DataFileMetadata {
8075
last_modified,
8176
size,
8277
},
83-
num_records,
78+
stats,
79+
..
8480
} = self;
8581
// create the record batch of the write metadata
8682
let path = Arc::new(StringArray::from(vec![location.to_string()]));
@@ -104,20 +100,35 @@ impl DataFileMetadata {
104100
.map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?;
105101
let size = Arc::new(Int64Array::from(vec![size]));
106102
let modification_time = Arc::new(Int64Array::from(vec![*last_modified]));
107-
let stats = Arc::new(StructArray::try_new_with_length(
108-
vec![Field::new("numRecords", DataType::Int64, true)].into(),
109-
vec![Arc::new(Int64Array::from(vec![*num_records as i64]))],
110-
None,
111-
1,
112-
)?);
113103

114-
Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
115-
Arc::new(
116-
crate::transaction::BASE_ADD_FILES_SCHEMA
117-
.as_ref()
118-
.try_into_arrow()?,
104+
let stats_array = Arc::new(stats.clone());
105+
106+
// Build schema dynamically based on stats (stats schema varies based on collected statistics)
107+
let key_value_struct = DataType::Struct(
108+
vec![
109+
Field::new("key", DataType::Utf8, false),
110+
Field::new("value", DataType::Utf8, true),
111+
]
112+
.into(),
113+
);
114+
let schema = Schema::new(vec![
115+
Field::new("path", DataType::Utf8, false),
116+
Field::new(
117+
"partitionValues",
118+
DataType::Map(
119+
Arc::new(Field::new("key_value", key_value_struct, false)),
120+
false,
121+
),
122+
false,
119123
),
120-
vec![path, partitions, size, modification_time, stats],
124+
Field::new("size", DataType::Int64, false),
125+
Field::new("modificationTime", DataType::Int64, false),
126+
Field::new("stats", stats_array.data_type().clone(), true),
127+
]);
128+
129+
Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
130+
Arc::new(schema),
131+
vec![path, partitions, size, modification_time, stats_array],
121132
)?)))
122133
}
123134
}
@@ -148,10 +159,13 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {
148159
&self,
149160
path: &url::Url,
150161
data: Box<dyn EngineData>,
162+
stats_columns: &[String],
151163
) -> DeltaResult<DataFileMetadata> {
152164
let batch: Box<_> = ArrowEngineData::try_from_engine_data(data)?;
153165
let record_batch = batch.record_batch();
154-
let num_records = record_batch.num_rows();
166+
167+
// Collect statistics before writing (includes numRecords)
168+
let stats = collect_stats(record_batch, stats_columns)?;
155169

156170
let mut buffer = vec![];
157171
let mut writer = ArrowWriter::try_new(&mut buffer, record_batch.schema(), None)?;
@@ -185,7 +199,7 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {
185199
}
186200

187201
let file_meta = FileMeta::new(path, modification_time, size);
188-
Ok(DataFileMetadata::new(file_meta, num_records))
202+
Ok(DataFileMetadata::new(file_meta, stats))
189203
}
190204

191205
/// Write `data` to `{path}/<uuid>.parquet` as parquet using ArrowWriter and return the parquet
@@ -201,9 +215,11 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {
201215
path: &url::Url,
202216
data: Box<dyn EngineData>,
203217
partition_values: HashMap<String, String>,
204-
_stats_columns: Option<&[String]>,
218+
stats_columns: Option<&[String]>,
205219
) -> DeltaResult<Box<dyn EngineData>> {
206-
let parquet_metadata = self.write_parquet(path, data).await?;
220+
let parquet_metadata = self
221+
.write_parquet(path, data, stats_columns.unwrap_or(&[]))
222+
.await?;
207223
parquet_metadata.as_record_batch(&partition_values)
208224
}
209225
}
@@ -609,19 +625,26 @@ mod tests {
609625
let last_modified = 10000000000;
610626
let num_records = 10;
611627
let file_metadata = FileMeta::new(location.clone(), last_modified, size);
612-
let data_file_metadata = DataFileMetadata::new(file_metadata, num_records);
628+
let stats = StructArray::try_new(
629+
vec![
630+
Field::new("numRecords", ArrowDataType::Int64, true),
631+
Field::new("tightBounds", ArrowDataType::Boolean, true),
632+
]
633+
.into(),
634+
vec![
635+
Arc::new(Int64Array::from(vec![num_records as i64])),
636+
Arc::new(BooleanArray::from(vec![true])),
637+
],
638+
None,
639+
)
640+
.unwrap();
641+
let data_file_metadata = DataFileMetadata::new(file_metadata, stats.clone());
613642
let partition_values = HashMap::from([("partition1".to_string(), "a".to_string())]);
614643
let actual = data_file_metadata
615644
.as_record_batch(&partition_values)
616645
.unwrap();
617646
let actual = ArrowEngineData::try_from_engine_data(actual).unwrap();
618647

619-
let schema = Arc::new(
620-
crate::transaction::BASE_ADD_FILES_SCHEMA
621-
.as_ref()
622-
.try_into_arrow()
623-
.unwrap(),
624-
);
625648
let mut partition_values_builder = MapBuilder::new(
626649
Some(MapFieldNames {
627650
entry: "key_value".to_string(),
@@ -635,13 +658,33 @@ mod tests {
635658
partition_values_builder.values().append_value("a");
636659
partition_values_builder.append(true).unwrap();
637660
let partition_values = partition_values_builder.finish();
638-
let stats_struct = StructArray::try_new_with_length(
639-
vec![Field::new("numRecords", ArrowDataType::Int64, true)].into(),
640-
vec![Arc::new(Int64Array::from(vec![num_records as i64]))],
641-
None,
642-
1,
643-
)
644-
.unwrap();
661+
662+
// Build expected schema dynamically based on stats
663+
let stats_field = Field::new("stats", stats.data_type().clone(), true);
664+
let schema = Arc::new(crate::arrow::datatypes::Schema::new(vec![
665+
Field::new("path", ArrowDataType::Utf8, false),
666+
Field::new(
667+
"partitionValues",
668+
ArrowDataType::Map(
669+
Arc::new(Field::new(
670+
"key_value",
671+
ArrowDataType::Struct(
672+
vec![
673+
Field::new("key", ArrowDataType::Utf8, false),
674+
Field::new("value", ArrowDataType::Utf8, true),
675+
]
676+
.into(),
677+
),
678+
false,
679+
)),
680+
false,
681+
),
682+
false,
683+
),
684+
Field::new("size", ArrowDataType::Int64, false),
685+
Field::new("modificationTime", ArrowDataType::Int64, false),
686+
stats_field,
687+
]));
645688

646689
let expected = RecordBatch::try_new(
647690
schema,
@@ -650,7 +693,7 @@ mod tests {
650693
Arc::new(partition_values),
651694
Arc::new(Int64Array::from(vec![size as i64])),
652695
Arc::new(Int64Array::from(vec![last_modified])),
653-
Arc::new(stats_struct),
696+
Arc::new(stats),
654697
],
655698
)
656699
.unwrap();
@@ -673,7 +716,7 @@ mod tests {
673716
));
674717

675718
let write_metadata = parquet_handler
676-
.write_parquet(&Url::parse("memory:///data/").unwrap(), data)
719+
.write_parquet(&Url::parse("memory:///data/").unwrap(), data, &[])
677720
.await
678721
.unwrap();
679722

@@ -684,7 +727,7 @@ mod tests {
684727
last_modified,
685728
size,
686729
},
687-
num_records,
730+
ref stats,
688731
} = write_metadata;
689732
let expected_location = Url::parse("memory:///data/").unwrap();
690733

@@ -702,6 +745,15 @@ mod tests {
702745
assert_eq!(&expected_location.join(filename).unwrap(), location);
703746
assert_eq!(expected_size, size);
704747
assert!(now - last_modified < 10_000);
748+
749+
// Check numRecords from stats
750+
let num_records = stats
751+
.column_by_name("numRecords")
752+
.unwrap()
753+
.as_any()
754+
.downcast_ref::<Int64Array>()
755+
.unwrap()
756+
.value(0);
705757
assert_eq!(num_records, 3);
706758

707759
// check we can read back
@@ -744,7 +796,7 @@ mod tests {
744796

745797
assert_result_error_with_message(
746798
parquet_handler
747-
.write_parquet(&Url::parse("memory:///data").unwrap(), data)
799+
.write_parquet(&Url::parse("memory:///data").unwrap(), data, &[])
748800
.await,
749801
"Generic delta kernel error: Path must end with a trailing slash: memory:///data",
750802
);

kernel/src/engine/default/stats.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
//! Statistics collection for Delta Lake file writes.
2+
//!
3+
//! Provides `collect_stats` to compute statistics for a RecordBatch during file writes.
4+
5+
use std::sync::Arc;
6+
7+
use crate::arrow::array::{Array, BooleanArray, Int64Array, RecordBatch, StructArray};
8+
use crate::arrow::datatypes::{DataType, Field};
9+
use crate::{DeltaResult, Error};
10+
11+
/// Collect statistics from a RecordBatch for Delta Lake file statistics.
12+
///
13+
/// Returns a StructArray with the following fields:
14+
/// - `numRecords`: total row count
15+
/// - `tightBounds`: always true for new file writes
16+
///
17+
/// # Arguments
18+
/// * `batch` - The RecordBatch to collect statistics from
19+
/// * `_stats_columns` - Column names that should have statistics collected (reserved for future use)
20+
pub(crate) fn collect_stats(
21+
batch: &RecordBatch,
22+
_stats_columns: &[String],
23+
) -> DeltaResult<StructArray> {
24+
let mut fields = Vec::new();
25+
let mut arrays: Vec<Arc<dyn Array>> = Vec::new();
26+
27+
// numRecords
28+
fields.push(Field::new("numRecords", DataType::Int64, true));
29+
arrays.push(Arc::new(Int64Array::from(vec![batch.num_rows() as i64])));
30+
31+
// tightBounds - always true for new file writes
32+
fields.push(Field::new("tightBounds", DataType::Boolean, true));
33+
arrays.push(Arc::new(BooleanArray::from(vec![true])));
34+
35+
StructArray::try_new(fields.into(), arrays, None)
36+
.map_err(|e| Error::generic(format!("Failed to create stats struct: {e}")))
37+
}
38+
39+
#[cfg(test)]
40+
mod tests {
41+
use super::*;
42+
use crate::arrow::array::Int64Array;
43+
use crate::arrow::datatypes::Schema;
44+
45+
#[test]
46+
fn test_collect_stats_basic() {
47+
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
48+
49+
let batch =
50+
RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![1, 2, 3]))]).unwrap();
51+
52+
let stats = collect_stats(&batch, &["id".to_string()]).unwrap();
53+
54+
assert_eq!(stats.len(), 1);
55+
56+
// Check numRecords
57+
let num_records = stats
58+
.column_by_name("numRecords")
59+
.unwrap()
60+
.as_any()
61+
.downcast_ref::<Int64Array>()
62+
.unwrap();
63+
assert_eq!(num_records.value(0), 3);
64+
65+
// Check tightBounds
66+
let tight_bounds = stats
67+
.column_by_name("tightBounds")
68+
.unwrap()
69+
.as_any()
70+
.downcast_ref::<BooleanArray>()
71+
.unwrap();
72+
assert!(tight_bounds.value(0));
73+
}
74+
75+
#[test]
76+
fn test_collect_stats_empty_batch() {
77+
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
78+
79+
let empty: Vec<i64> = vec![];
80+
let batch = RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(empty))]).unwrap();
81+
82+
let stats = collect_stats(&batch, &[]).unwrap();
83+
84+
let num_records = stats
85+
.column_by_name("numRecords")
86+
.unwrap()
87+
.as_any()
88+
.downcast_ref::<Int64Array>()
89+
.unwrap();
90+
assert_eq!(num_records.value(0), 0);
91+
}
92+
}

0 commit comments

Comments
 (0)