Skip to content

Commit 0e03b17

Browse files
committed
ps
1 parent 9619ed9 commit 0e03b17

24 files changed

+380
-93
lines changed

kernel/src/log_replay.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,13 +310,18 @@ pub(crate) trait LogReplayProcessor: Sized {
310310
///
311311
/// # Parameters
312312
/// - `batch`: A reference to the batch of actions to be processed.
313+
/// - `is_log_batch`: Whether this batch is from a commit log (`true`) or checkpoint (`false`).
313314
///
314315
/// # Returns
315316
/// A `DeltaResult<Vec<bool>>`, where each boolean indicates if the corresponding row should be included.
316317
/// If no filter is provided, all rows are selected.
317-
fn build_selection_vector(&self, batch: &dyn EngineData) -> DeltaResult<Vec<bool>> {
318+
fn build_selection_vector(
319+
&self,
320+
batch: &dyn EngineData,
321+
is_log_batch: bool,
322+
) -> DeltaResult<Vec<bool>> {
318323
match self.data_skipping_filter() {
319-
Some(filter) => filter.apply(batch),
324+
Some(filter) => filter.apply(batch, is_log_batch),
320325
None => Ok(vec![true; batch.len()]), // If no filter is provided, select all rows
321326
}
322327
}

kernel/src/parallel/sequential_phase.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,10 +201,11 @@ impl<P: LogReplayProcessor> Iterator for SequentialPhase<P> {
201201
#[cfg(test)]
202202
mod tests {
203203
use super::*;
204+
use crate::actions::get_log_add_schema;
205+
use crate::log_segment::CheckpointReadInfo;
204206
use crate::scan::log_replay::ScanLogReplayProcessor;
205207
use crate::scan::state_info::StateInfo;
206208
use crate::utils::test_utils::{assert_result_error_with_message, load_test_table};
207-
use std::sync::Arc;
208209

209210
/// Core helper function to verify sequential processing with expected adds and sidecars.
210211
fn verify_sequential_processing(
@@ -222,7 +223,13 @@ mod tests {
222223
(),
223224
)?);
224225

225-
let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?;
226+
// Use base log add schema for tests - no stats_parsed optimization
227+
let checkpoint_info = CheckpointReadInfo {
228+
has_stats_parsed: false,
229+
checkpoint_read_schema: get_log_add_schema().clone(),
230+
};
231+
232+
let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info, checkpoint_info)?;
226233
let mut sequential = SequentialPhase::try_new(processor, log_segment, engine.clone())?;
227234

228235
// Process all batches and collect Add file paths
@@ -313,7 +320,13 @@ mod tests {
313320
(),
314321
)?);
315322

316-
let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?;
323+
// Use base log add schema for tests - no stats_parsed optimization
324+
let checkpoint_info = CheckpointReadInfo {
325+
has_stats_parsed: false,
326+
checkpoint_read_schema: get_log_add_schema().clone(),
327+
};
328+
329+
let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info, checkpoint_info)?;
317330
let mut sequential = SequentialPhase::try_new(processor, log_segment, engine.clone())?;
318331

319332
// Call next() once but don't exhaust the iterator

kernel/src/scan/data_skipping.rs

Lines changed: 78 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::sync::{Arc, LazyLock};
33

44
use tracing::{debug, error};
55

6-
use crate::actions::get_log_add_schema;
76
use crate::actions::visitors::SelectionVectorVisitor;
87
use crate::error::DeltaResult;
98
use crate::expressions::{
@@ -14,14 +13,13 @@ use crate::expressions::{
1413
use crate::kernel_predicates::{
1514
DataSkippingPredicateEvaluator, KernelPredicateEvaluator, KernelPredicateEvaluatorDefaults,
1615
};
17-
use crate::schema::{DataType, SchemaRef, SchemaTransform, StructField, StructType};
16+
use crate::schema::{DataType, SchemaRef};
1817
use crate::{
1918
Engine, EngineData, ExpressionEvaluator, JsonHandler, PredicateEvaluator, RowVisitor as _,
2019
};
2120

2221
pub(crate) mod stats_schema;
2322

24-
use stats_schema::{NullCountStatsTransform, NullableStatsTransform};
2523
#[cfg(test)]
2624
mod tests;
2725

@@ -55,46 +53,50 @@ fn as_sql_data_skipping_predicate(pred: &Pred) -> Option<Pred> {
5553
pub(crate) struct DataSkippingFilter {
5654
stats_schema: SchemaRef,
5755
select_stats_evaluator: Arc<dyn ExpressionEvaluator>,
56+
/// Evaluator for extracting stats_parsed from checkpoints.
57+
/// Only present when the checkpoint has compatible pre-parsed stats.
58+
select_stats_parsed_evaluator: Option<Arc<dyn ExpressionEvaluator>>,
5859
skipping_evaluator: Arc<dyn PredicateEvaluator>,
5960
filter_evaluator: Arc<dyn PredicateEvaluator>,
6061
json_handler: Arc<dyn JsonHandler>,
6162
}
6263

6364
impl DataSkippingFilter {
64-
/// Creates a new data skipping filter. Returns None if there is no predicate, or the predicate
65-
/// is ineligible for data skipping.
65+
/// Creates a new data skipping filter. Returns None if there is no predicate/stats_schema,
66+
/// or the predicate is ineligible for data skipping.
6667
///
6768
/// NOTE: None is equivalent to a trivial filter that always returns TRUE (= keeps all files),
6869
/// but using an Option lets the engine easily avoid the overhead of applying trivial filters.
70+
///
71+
/// `checkpoint_read_schema` is the schema used to read checkpoint files, which includes
72+
/// `stats_parsed` for data skipping optimization. This schema is needed to create the
73+
/// evaluator that extracts stats_parsed from the actions.
74+
///
75+
/// `has_compatible_stats_parsed` indicates whether the checkpoint has compatible pre-parsed
76+
/// stats. When true, checkpoint batches use stats_parsed directly instead of parsing JSON.
6977
pub(crate) fn new(
7078
engine: &dyn Engine,
71-
physical_predicate: Option<(PredicateRef, SchemaRef)>,
79+
predicate: Option<PredicateRef>,
80+
stats_schema: Option<SchemaRef>,
81+
checkpoint_read_schema: SchemaRef,
82+
has_compatible_stats_parsed: bool,
7283
) -> Option<Self> {
7384
static STATS_EXPR: LazyLock<ExpressionRef> =
7485
LazyLock::new(|| Arc::new(column_expr!("add.stats")));
86+
static STATS_PARSED_EXPR: LazyLock<ExpressionRef> =
87+
LazyLock::new(|| Arc::new(column_expr!("add.stats_parsed")));
7588
static FILTER_PRED: LazyLock<PredicateRef> =
7689
LazyLock::new(|| Arc::new(column_expr!("output").distinct(Expr::literal(false))));
7790

78-
let (predicate, referenced_schema) = physical_predicate?;
91+
let predicate = predicate?;
92+
let stats_schema = stats_schema?;
7993
debug!("Creating a data skipping filter for {:#?}", predicate);
8094

81-
let stats_schema = NullableStatsTransform
82-
.transform_struct(&referenced_schema)?
83-
.into_owned();
84-
85-
let nullcount_schema = NullCountStatsTransform
86-
.transform_struct(&stats_schema)?
87-
.into_owned();
88-
let stats_schema = Arc::new(StructType::new_unchecked([
89-
StructField::nullable("numRecords", DataType::LONG),
90-
StructField::nullable("nullCount", nullcount_schema),
91-
StructField::nullable("minValues", stats_schema.clone()),
92-
StructField::nullable("maxValues", stats_schema),
93-
]));
94-
9595
// Skipping happens in several steps:
9696
//
97-
// 1. The stats selector fetches add.stats from the metadata
97+
// 1. The stats selector fetches add.stats or add.stats_parsed from the metadata.
98+
// For checkpoint batches with compatible stats_parsed, we use stats_parsed directly.
99+
// Otherwise, we parse add.stats (JSON string) to a stats struct.
98100
//
99101
// 2. The predicate (skipping evaluator) produces false for any file whose stats prove we
100102
// can safely skip it. A value of true means the stats say we must keep the file, and
@@ -106,7 +108,7 @@ impl DataSkippingFilter {
106108
let select_stats_evaluator = engine
107109
.evaluation_handler()
108110
.new_expression_evaluator(
109-
get_log_add_schema().clone(),
111+
checkpoint_read_schema.clone(),
110112
STATS_EXPR.clone(),
111113
DataType::STRING,
112114
)
@@ -115,6 +117,23 @@ impl DataSkippingFilter {
115117
.inspect_err(|e| error!("Failed to create select stats evaluator: {e}"))
116118
.ok()?;
117119

120+
// Only create stats_parsed evaluator when checkpoint has compatible pre-parsed stats
121+
let select_stats_parsed_evaluator = if has_compatible_stats_parsed {
122+
engine
123+
.evaluation_handler()
124+
.new_expression_evaluator(
125+
checkpoint_read_schema.clone(),
126+
STATS_PARSED_EXPR.clone(),
127+
DataType::Struct(Box::new(stats_schema.as_ref().clone())),
128+
)
129+
.inspect_err(|e| {
130+
debug!("stats_parsed evaluator not available (falling back to JSON): {e}")
131+
})
132+
.ok()
133+
} else {
134+
None
135+
};
136+
118137
let skipping_evaluator = engine
119138
.evaluation_handler()
120139
.new_predicate_evaluator(
@@ -137,6 +156,7 @@ impl DataSkippingFilter {
137156
Some(Self {
138157
stats_schema,
139158
select_stats_evaluator,
159+
select_stats_parsed_evaluator,
140160
skipping_evaluator,
141161
filter_evaluator,
142162
json_handler: engine.json_handler(),
@@ -145,34 +165,51 @@ impl DataSkippingFilter {
145165

146166
/// Apply the DataSkippingFilter to an EngineData batch of actions. Returns a selection vector
147167
/// which can be applied to the actions to find those that passed data skipping.
148-
pub(crate) fn apply(&self, actions: &dyn EngineData) -> DeltaResult<Vec<bool>> {
149-
// retrieve and parse stats from actions data
150-
let stats = self.select_stats_evaluator.evaluate(actions)?;
151-
assert_eq!(stats.len(), actions.len());
152-
let parsed_stats = self
153-
.json_handler
154-
.parse_json(stats, self.stats_schema.clone())?;
155-
assert_eq!(parsed_stats.len(), actions.len());
168+
///
169+
/// `is_log_batch` indicates whether this batch is from a commit log (`true`) or checkpoint (`false`).
170+
/// Checkpoint batches may have pre-parsed stats (`stats_parsed`) that can be used directly
171+
/// instead of parsing JSON. Commit batches only have JSON stats.
172+
pub(crate) fn apply(
173+
&self,
174+
actions: &dyn EngineData,
175+
is_log_batch: bool,
176+
) -> DeltaResult<Vec<bool>> {
177+
// Get the final stats by either:
178+
// 1. Using stats_parsed directly (for checkpoint batches with compatible stats)
179+
// 2. Parsing JSON (for commit batches or when stats_parsed unavailable)
180+
let final_stats = if let Some(stats_parsed_evaluator) = (!is_log_batch)
181+
.then_some(())
182+
.and(self.select_stats_parsed_evaluator.as_ref())
183+
{
184+
// Checkpoint batch with compatible stats_parsed - use it directly
185+
let stats_parsed = stats_parsed_evaluator.evaluate(actions)?;
186+
debug!(
187+
"Using stats_parsed from checkpoint ({} rows)",
188+
stats_parsed.len()
189+
);
190+
stats_parsed
191+
} else {
192+
// Commit batch or no stats_parsed evaluator - parse JSON
193+
let stats_json = self.select_stats_evaluator.evaluate(actions)?;
194+
assert_eq!(stats_json.len(), actions.len());
195+
self.json_handler
196+
.parse_json(stats_json, self.stats_schema.clone())?
197+
};
156198

157-
// evaluate the predicate on the parsed stats, then convert to selection vector
158-
let skipping_predicate = self.skipping_evaluator.evaluate(&*parsed_stats)?;
199+
assert_eq!(final_stats.len(), actions.len());
200+
201+
// Evaluate predicate on the stats
202+
let skipping_predicate = self.skipping_evaluator.evaluate(&*final_stats)?;
159203
assert_eq!(skipping_predicate.len(), actions.len());
160204
let selection_vector = self
161205
.filter_evaluator
162206
.evaluate(skipping_predicate.as_ref())?;
163207
assert_eq!(selection_vector.len(), actions.len());
164208

165-
// visit the engine's selection vector to produce a Vec<bool>
209+
// Visit the engine's selection vector to produce a Vec<bool>
166210
let mut visitor = SelectionVectorVisitor::default();
167211
visitor.visit_rows_of(selection_vector.as_ref())?;
168212
Ok(visitor.selection_vector)
169-
170-
// TODO(zach): add some debug info about data skipping that occurred
171-
// let before_count = actions.length();
172-
// debug!(
173-
// "number of actions before/after data skipping: {before_count} / {}",
174-
// filtered_actions.num_rows()
175-
// );
176213
}
177214
}
178215

kernel/src/scan/data_skipping/stats_schema.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,13 +175,17 @@ impl<'a> SchemaTransform<'a> for NullableStatsTransform {
175175
}
176176
}
177177

178+
<<<<<<< HEAD
178179
/// Converts a stats schema into a nullCount schema where all leaf fields become LONG.
179180
///
180181
/// The nullCount struct field tracks the number of null values for each column.
181182
/// All leaf fields (primitives, arrays, maps, variants) are converted to LONG type
182183
/// since null counts are always integers, while struct fields are recursed into
183184
/// to preserve the nested structure.
184185
#[allow(unused)]
186+
=======
187+
/// Transforms a schema to convert all primitive fields to LONG (for null count stats).
188+
>>>>>>> 53871208 (ps)
185189
pub(crate) struct NullCountStatsTransform;
186190
impl<'a> SchemaTransform<'a> for NullCountStatsTransform {
187191
fn transform_struct_field(&mut self, field: &'a StructField) -> Option<Cow<'a, StructField>> {

0 commit comments

Comments
 (0)