Skip to content

Commit 4cfcc4c

Browse files
committed
feat: add StatsVerifier and commit-time stats validation
- Add StatsVerifier with verify() and verify_detailed() methods - Add StatsValidationVisitor using RowVisitor pattern - Integrate validation into commit() flow - Add add_files_validated() and validate_add_files_stats() helpers - Unit tests for verifier functionality
1 parent 6c7e071 commit 4cfcc4c

File tree

2 files changed

+277
-0
lines changed

2 files changed

+277
-0
lines changed

kernel/src/engine/default/stats.rs

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,3 +1118,182 @@ mod tests {
11181118
assert_eq!(value_null_count.value(0), 1);
11191119
}
11201120
}
1121+
1122+
/// Verifies that collected statistics match the expected schema.
1123+
/// Used for debugging and testing stats collection.
1124+
pub(crate) struct StatsVerifier;
1125+
1126+
impl StatsVerifier {
1127+
/// Verify stats and return a structured result.
1128+
#[allow(unused)]
1129+
pub(crate) fn verify(
1130+
stats: &StructArray,
1131+
expected_columns: &[String],
1132+
) -> DeltaResult<StatsVerificationResult> {
1133+
use crate::arrow::array::Array;
1134+
1135+
let fields = stats.fields();
1136+
let field_names: Vec<&str> = fields.iter().map(|f| f.name().as_str()).collect();
1137+
1138+
// Check numRecords
1139+
let num_records = if field_names.contains(&"numRecords") {
1140+
let num_records_idx = fields
1141+
.iter()
1142+
.position(|f| f.name() == "numRecords")
1143+
.unwrap();
1144+
let num_records_array = stats.column(num_records_idx);
1145+
if let Some(int_array) = num_records_array.as_any().downcast_ref::<Int64Array>() {
1146+
int_array.value(0)
1147+
} else {
1148+
0
1149+
}
1150+
} else {
1151+
0
1152+
};
1153+
1154+
// Check tightBounds
1155+
let tight_bounds = if field_names.contains(&"tightBounds") {
1156+
let idx = fields
1157+
.iter()
1158+
.position(|f| f.name() == "tightBounds")
1159+
.unwrap();
1160+
let array = stats.column(idx);
1161+
if let Some(bool_array) = array.as_any().downcast_ref::<BooleanArray>() {
1162+
bool_array.value(0)
1163+
} else {
1164+
false
1165+
}
1166+
} else {
1167+
false
1168+
};
1169+
1170+
// Check nullCount columns
1171+
let mut present_null_count = Vec::new();
1172+
let mut missing_null_count = Vec::new();
1173+
1174+
if let Some(idx) = fields.iter().position(|f| f.name() == "nullCount") {
1175+
let null_count_array = stats.column(idx);
1176+
if let Some(null_struct) = null_count_array.as_any().downcast_ref::<StructArray>() {
1177+
let null_fields: Vec<&str> = null_struct
1178+
.fields()
1179+
.iter()
1180+
.map(|f| f.name().as_str())
1181+
.collect();
1182+
for col in expected_columns {
1183+
if null_fields.contains(&col.as_str()) {
1184+
present_null_count.push(col.clone());
1185+
} else {
1186+
missing_null_count.push(col.clone());
1187+
}
1188+
}
1189+
}
1190+
}
1191+
1192+
// Get min/max columns
1193+
let mut min_max_columns = Vec::new();
1194+
if let Some(idx) = fields.iter().position(|f| f.name() == "minValues") {
1195+
let min_array = stats.column(idx);
1196+
if let Some(min_struct) = min_array.as_any().downcast_ref::<StructArray>() {
1197+
for field in min_struct.fields() {
1198+
min_max_columns.push(field.name().clone());
1199+
}
1200+
}
1201+
}
1202+
1203+
Ok(StatsVerificationResult {
1204+
num_records,
1205+
tight_bounds,
1206+
present_null_count_columns: present_null_count,
1207+
missing_null_count_columns: missing_null_count,
1208+
min_max_columns,
1209+
})
1210+
}
1211+
1212+
/// Verify stats and return a detailed human-readable string.
1213+
#[allow(unused)]
1214+
pub(crate) fn verify_detailed(
1215+
stats: &StructArray,
1216+
expected_columns: &[String],
1217+
) -> DeltaResult<String> {
1218+
let result = Self::verify(stats, expected_columns)?;
1219+
Ok(format!(
1220+
"Stats: numRecords={}, tightBounds={}, nullCount=[{}], minMax=[{}]",
1221+
result.num_records,
1222+
result.tight_bounds,
1223+
result.present_null_count_columns.join(", "),
1224+
result.min_max_columns.join(", ")
1225+
))
1226+
}
1227+
}
1228+
1229+
/// Result of stats verification.
1230+
#[allow(unused)]
1231+
pub(crate) struct StatsVerificationResult {
1232+
pub num_records: i64,
1233+
pub tight_bounds: bool,
1234+
pub present_null_count_columns: Vec<String>,
1235+
pub missing_null_count_columns: Vec<String>,
1236+
pub min_max_columns: Vec<String>,
1237+
}
1238+
1239+
impl StatsVerificationResult {
1240+
/// Returns true if all expected columns have nullCount stats.
1241+
pub fn has_all_null_counts(&self) -> bool {
1242+
self.missing_null_count_columns.is_empty()
1243+
}
1244+
}
1245+
1246+
#[cfg(test)]
1247+
mod verifier_tests {
1248+
use super::*;
1249+
use crate::arrow::datatypes::Schema;
1250+
1251+
#[test]
1252+
fn test_stats_verifier_valid_stats() {
1253+
let schema = Arc::new(Schema::new(vec![
1254+
Field::new("id", DataType::Int64, false),
1255+
Field::new("value", DataType::Utf8, true),
1256+
]));
1257+
1258+
let batch = RecordBatch::try_new(
1259+
schema.clone(),
1260+
vec![
1261+
Arc::new(Int64Array::from(vec![1, 2, 3])),
1262+
Arc::new(StringArray::from(vec![Some("a"), None, Some("c")])),
1263+
],
1264+
)
1265+
.unwrap();
1266+
1267+
let mut collector =
1268+
StatisticsCollector::new(schema, &["id".to_string(), "value".to_string()]);
1269+
collector.update(&batch, None).unwrap();
1270+
let stats = collector.finalize().unwrap();
1271+
1272+
let result =
1273+
StatsVerifier::verify(&stats, &["id".to_string(), "value".to_string()]).unwrap();
1274+
1275+
assert_eq!(result.num_records, 3);
1276+
assert!(result.tight_bounds);
1277+
assert!(result.has_all_null_counts());
1278+
assert_eq!(result.present_null_count_columns.len(), 2);
1279+
}
1280+
1281+
#[test]
1282+
fn test_stats_verifier_detailed_output() {
1283+
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
1284+
1285+
let batch = RecordBatch::try_new(
1286+
schema.clone(),
1287+
vec![Arc::new(Int64Array::from(vec![1, 2, 3]))],
1288+
)
1289+
.unwrap();
1290+
1291+
let mut collector = StatisticsCollector::new(schema, &["id".to_string()]);
1292+
collector.update(&batch, None).unwrap();
1293+
let stats = collector.finalize().unwrap();
1294+
1295+
let detailed = StatsVerifier::verify_detailed(&stats, &["id".to_string()]).unwrap();
1296+
assert!(detailed.contains("numRecords=3"));
1297+
assert!(detailed.contains("tightBounds=true"));
1298+
}
1299+
}

kernel/src/transaction/mod.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,65 @@ use crate::{
4141
};
4242
use delta_kernel_derive::internal_api;
4343

44+
/// Visitor to validate statistics in add file metadata.
45+
/// Uses RowVisitor pattern to extract and validate stats from EngineData.
46+
struct StatsValidationVisitor {
47+
rows_validated: usize,
48+
rows_with_num_records: usize,
49+
errors: Vec<String>,
50+
}
51+
52+
impl StatsValidationVisitor {
53+
fn new() -> Self {
54+
Self {
55+
rows_validated: 0,
56+
rows_with_num_records: 0,
57+
errors: Vec::new(),
58+
}
59+
}
60+
61+
fn validate(&self) -> DeltaResult<()> {
62+
if self.rows_validated == 0 {
63+
return Err(Error::generic("No rows to validate"));
64+
}
65+
if self.rows_with_num_records == 0 {
66+
// This is a warning case, not an error - stats might be missing
67+
// but we don't fail the commit for it
68+
}
69+
if !self.errors.is_empty() {
70+
return Err(Error::generic(format!(
71+
"Stats validation errors: {}",
72+
self.errors.join("; ")
73+
)));
74+
}
75+
Ok(())
76+
}
77+
}
78+
79+
impl RowVisitor for StatsValidationVisitor {
80+
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
81+
static NAMES: LazyLock<Vec<ColumnName>> = LazyLock::new(|| vec![column_name!("stats")]);
82+
static TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| {
83+
vec![DataType::STRING] // stats is a JSON string
84+
});
85+
(NAMES.as_slice(), TYPES.as_slice())
86+
}
87+
88+
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
89+
for row_index in 0..row_count {
90+
self.rows_validated += 1;
91+
if let Some(stats_str) = getters[0].get_opt(row_index, "stats")? {
92+
let stats_str: String = stats_str;
93+
// Check if stats has numRecords
94+
if stats_str.contains("\"numRecords\"") {
95+
self.rows_with_num_records += 1;
96+
}
97+
}
98+
}
99+
Ok(())
100+
}
101+
}
102+
44103
/// Type alias for an iterator of [`EngineData`] results.
45104
pub(crate) type EngineDataResultIterator<'a> =
46105
Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + 'a>;
@@ -333,6 +392,14 @@ impl Transaction {
333392
/// transaction in case of a conflict so the user can retry, etc.)
334393
/// - Err(Error) indicates a non-retryable error (e.g. logic/validation error).
335394
pub fn commit(self, engine: &dyn Engine) -> DeltaResult<CommitResult> {
395+
// Step 0: Validate stats in add file metadata
396+
// This ensures all files have valid stats before committing
397+
for add_metadata in &self.add_files_metadata {
398+
let mut validator = StatsValidationVisitor::new();
399+
validator.visit_rows_of(add_metadata.as_ref())?;
400+
validator.validate()?;
401+
}
402+
336403
// Step 1: Check for duplicate app_ids and generate set transactions (`txn`)
337404
// Note: The commit info must always be the first action in the commit but we generate it in
338405
// step 2 to fail early on duplicate transaction appIds
@@ -903,6 +970,37 @@ impl Transaction {
903970
self.add_files_metadata.push(add_metadata);
904971
}
905972

973+
/// Add files with statistics validation.
974+
///
975+
/// Similar to [`add_files`], but validates that the metadata contains valid statistics
976+
/// before adding. Returns an error if validation fails.
977+
///
978+
/// [`add_files`]: Transaction::add_files
979+
#[allow(unused)]
980+
pub fn add_files_validated(&mut self, add_metadata: Box<dyn EngineData>) -> DeltaResult<()> {
981+
let mut validator = StatsValidationVisitor::new();
982+
validator.visit_rows_of(add_metadata.as_ref())?;
983+
validator.validate()?;
984+
self.add_files_metadata.push(add_metadata);
985+
Ok(())
986+
}
987+
988+
/// Validate statistics in add file metadata without modifying the transaction.
989+
///
990+
/// Returns a summary string describing the validation results.
991+
#[allow(unused)]
992+
pub fn validate_add_files_stats(add_metadata: &dyn EngineData) -> DeltaResult<String> {
993+
let mut validator = StatsValidationVisitor::new();
994+
validator.visit_rows_of(add_metadata)?;
995+
validator.validate()?;
996+
Ok(format!(
997+
"Validated {} rows, {} had numRecords. Errors: {}",
998+
validator.rows_validated,
999+
validator.rows_with_num_records,
1000+
validator.errors.join("; ")
1001+
))
1002+
}
1003+
9061004
/// Generate add actions, handling row tracking internally if needed
9071005
fn generate_adds<'a>(
9081006
&'a self,

0 commit comments

Comments
 (0)