@@ -3,7 +3,6 @@ use std::sync::{Arc, LazyLock};
33
44use tracing:: { debug, error} ;
55
6- use crate :: actions:: get_log_add_schema;
76use crate :: actions:: visitors:: SelectionVectorVisitor ;
87use crate :: error:: DeltaResult ;
98use crate :: expressions:: {
@@ -14,14 +13,13 @@ use crate::expressions::{
1413use crate :: kernel_predicates:: {
1514 DataSkippingPredicateEvaluator , KernelPredicateEvaluator , KernelPredicateEvaluatorDefaults ,
1615} ;
17- use crate :: schema:: { DataType , SchemaRef , SchemaTransform , StructField , StructType } ;
16+ use crate :: schema:: { DataType , SchemaRef } ;
1817use crate :: {
1918 Engine , EngineData , ExpressionEvaluator , JsonHandler , PredicateEvaluator , RowVisitor as _,
2019} ;
2120
2221pub ( crate ) mod stats_schema;
2322
24- use stats_schema:: { NullCountStatsTransform , NullableStatsTransform } ;
2523#[ cfg( test) ]
2624mod tests;
2725
@@ -55,46 +53,50 @@ fn as_sql_data_skipping_predicate(pred: &Pred) -> Option<Pred> {
5553pub ( crate ) struct DataSkippingFilter {
5654 stats_schema : SchemaRef ,
5755 select_stats_evaluator : Arc < dyn ExpressionEvaluator > ,
56+ /// Evaluator for extracting stats_parsed from checkpoints.
57+ /// Only present when the checkpoint has compatible pre-parsed stats.
58+ select_stats_parsed_evaluator : Option < Arc < dyn ExpressionEvaluator > > ,
5859 skipping_evaluator : Arc < dyn PredicateEvaluator > ,
5960 filter_evaluator : Arc < dyn PredicateEvaluator > ,
6061 json_handler : Arc < dyn JsonHandler > ,
6162}
6263
6364impl DataSkippingFilter {
64- /// Creates a new data skipping filter. Returns None if there is no predicate, or the predicate
65- /// is ineligible for data skipping.
65+ /// Creates a new data skipping filter. Returns None if there is no predicate/stats_schema,
66+ /// or the predicate is ineligible for data skipping.
6667 ///
6768 /// NOTE: None is equivalent to a trivial filter that always returns TRUE (= keeps all files),
6869 /// but using an Option lets the engine easily avoid the overhead of applying trivial filters.
70+ ///
71+ /// `checkpoint_read_schema` is the schema used to read checkpoint files, which includes
72+ /// `stats_parsed` for data skipping optimization. This schema is needed to create the
73+ /// evaluator that extracts stats_parsed from the actions.
74+ ///
75+ /// `has_compatible_stats_parsed` indicates whether the checkpoint has compatible pre-parsed
76+ /// stats. When true, checkpoint batches use stats_parsed directly instead of parsing JSON.
6977 pub ( crate ) fn new (
7078 engine : & dyn Engine ,
71- physical_predicate : Option < ( PredicateRef , SchemaRef ) > ,
79+ predicate : Option < PredicateRef > ,
80+ stats_schema : Option < SchemaRef > ,
81+ checkpoint_read_schema : SchemaRef ,
82+ has_compatible_stats_parsed : bool ,
7283 ) -> Option < Self > {
7384 static STATS_EXPR : LazyLock < ExpressionRef > =
7485 LazyLock :: new ( || Arc :: new ( column_expr ! ( "add.stats" ) ) ) ;
86+ static STATS_PARSED_EXPR : LazyLock < ExpressionRef > =
87+ LazyLock :: new ( || Arc :: new ( column_expr ! ( "add.stats_parsed" ) ) ) ;
7588 static FILTER_PRED : LazyLock < PredicateRef > =
7689 LazyLock :: new ( || Arc :: new ( column_expr ! ( "output" ) . distinct ( Expr :: literal ( false ) ) ) ) ;
7790
78- let ( predicate, referenced_schema) = physical_predicate?;
91+ let predicate = predicate?;
92+ let stats_schema = stats_schema?;
7993 debug ! ( "Creating a data skipping filter for {:#?}" , predicate) ;
8094
81- let stats_schema = NullableStatsTransform
82- . transform_struct ( & referenced_schema) ?
83- . into_owned ( ) ;
84-
85- let nullcount_schema = NullCountStatsTransform
86- . transform_struct ( & stats_schema) ?
87- . into_owned ( ) ;
88- let stats_schema = Arc :: new ( StructType :: new_unchecked ( [
89- StructField :: nullable ( "numRecords" , DataType :: LONG ) ,
90- StructField :: nullable ( "nullCount" , nullcount_schema) ,
91- StructField :: nullable ( "minValues" , stats_schema. clone ( ) ) ,
92- StructField :: nullable ( "maxValues" , stats_schema) ,
93- ] ) ) ;
94-
9595 // Skipping happens in several steps:
9696 //
97- // 1. The stats selector fetches add.stats from the metadata
97+ // 1. The stats selector fetches add.stats or add.stats_parsed from the metadata.
98+ // For checkpoint batches with compatible stats_parsed, we use stats_parsed directly.
99+ // Otherwise, we parse add.stats (JSON string) to a stats struct.
98100 //
99101 // 2. The predicate (skipping evaluator) produces false for any file whose stats prove we
100102 // can safely skip it. A value of true means the stats say we must keep the file, and
@@ -106,7 +108,7 @@ impl DataSkippingFilter {
106108 let select_stats_evaluator = engine
107109 . evaluation_handler ( )
108110 . new_expression_evaluator (
109- get_log_add_schema ( ) . clone ( ) ,
111+ checkpoint_read_schema . clone ( ) ,
110112 STATS_EXPR . clone ( ) ,
111113 DataType :: STRING ,
112114 )
@@ -115,6 +117,23 @@ impl DataSkippingFilter {
115117 . inspect_err ( |e| error ! ( "Failed to create select stats evaluator: {e}" ) )
116118 . ok ( ) ?;
117119
120+ // Only create stats_parsed evaluator when checkpoint has compatible pre-parsed stats
121+ let select_stats_parsed_evaluator = if has_compatible_stats_parsed {
122+ engine
123+ . evaluation_handler ( )
124+ . new_expression_evaluator (
125+ checkpoint_read_schema. clone ( ) ,
126+ STATS_PARSED_EXPR . clone ( ) ,
127+ DataType :: Struct ( Box :: new ( stats_schema. as_ref ( ) . clone ( ) ) ) ,
128+ )
129+ . inspect_err ( |e| {
130+ debug ! ( "stats_parsed evaluator not available (falling back to JSON): {e}" )
131+ } )
132+ . ok ( )
133+ } else {
134+ None
135+ } ;
136+
118137 let skipping_evaluator = engine
119138 . evaluation_handler ( )
120139 . new_predicate_evaluator (
@@ -137,6 +156,7 @@ impl DataSkippingFilter {
137156 Some ( Self {
138157 stats_schema,
139158 select_stats_evaluator,
159+ select_stats_parsed_evaluator,
140160 skipping_evaluator,
141161 filter_evaluator,
142162 json_handler : engine. json_handler ( ) ,
@@ -145,34 +165,51 @@ impl DataSkippingFilter {
145165
146166 /// Apply the DataSkippingFilter to an EngineData batch of actions. Returns a selection vector
147167 /// which can be applied to the actions to find those that passed data skipping.
148- pub ( crate ) fn apply ( & self , actions : & dyn EngineData ) -> DeltaResult < Vec < bool > > {
149- // retrieve and parse stats from actions data
150- let stats = self . select_stats_evaluator . evaluate ( actions) ?;
151- assert_eq ! ( stats. len( ) , actions. len( ) ) ;
152- let parsed_stats = self
153- . json_handler
154- . parse_json ( stats, self . stats_schema . clone ( ) ) ?;
155- assert_eq ! ( parsed_stats. len( ) , actions. len( ) ) ;
168+ ///
169+ /// `is_log_batch` indicates whether this batch is from a commit log (`true`) or checkpoint (`false`).
170+ /// Checkpoint batches may have pre-parsed stats (`stats_parsed`) that can be used directly
171+ /// instead of parsing JSON. Commit batches only have JSON stats.
172+ pub ( crate ) fn apply (
173+ & self ,
174+ actions : & dyn EngineData ,
175+ is_log_batch : bool ,
176+ ) -> DeltaResult < Vec < bool > > {
177+ // Get the final stats by either:
178+ // 1. Using stats_parsed directly (for checkpoint batches with compatible stats)
179+ // 2. Parsing JSON (for commit batches or when stats_parsed unavailable)
180+ let final_stats = if let Some ( stats_parsed_evaluator) = ( !is_log_batch)
181+ . then_some ( ( ) )
182+ . and ( self . select_stats_parsed_evaluator . as_ref ( ) )
183+ {
184+ // Checkpoint batch with compatible stats_parsed - use it directly
185+ let stats_parsed = stats_parsed_evaluator. evaluate ( actions) ?;
186+ debug ! (
187+ "Using stats_parsed from checkpoint ({} rows)" ,
188+ stats_parsed. len( )
189+ ) ;
190+ stats_parsed
191+ } else {
192+ // Commit batch or no stats_parsed evaluator - parse JSON
193+ let stats_json = self . select_stats_evaluator . evaluate ( actions) ?;
194+ assert_eq ! ( stats_json. len( ) , actions. len( ) ) ;
195+ self . json_handler
196+ . parse_json ( stats_json, self . stats_schema . clone ( ) ) ?
197+ } ;
156198
157- // evaluate the predicate on the parsed stats, then convert to selection vector
158- let skipping_predicate = self . skipping_evaluator . evaluate ( & * parsed_stats) ?;
199+ assert_eq ! ( final_stats. len( ) , actions. len( ) ) ;
200+
201+ // Evaluate predicate on the stats
202+ let skipping_predicate = self . skipping_evaluator . evaluate ( & * final_stats) ?;
159203 assert_eq ! ( skipping_predicate. len( ) , actions. len( ) ) ;
160204 let selection_vector = self
161205 . filter_evaluator
162206 . evaluate ( skipping_predicate. as_ref ( ) ) ?;
163207 assert_eq ! ( selection_vector. len( ) , actions. len( ) ) ;
164208
165- // visit the engine's selection vector to produce a Vec<bool>
209+ // Visit the engine's selection vector to produce a Vec<bool>
166210 let mut visitor = SelectionVectorVisitor :: default ( ) ;
167211 visitor. visit_rows_of ( selection_vector. as_ref ( ) ) ?;
168212 Ok ( visitor. selection_vector )
169-
170- // TODO(zach): add some debug info about data skipping that occurred
171- // let before_count = actions.length();
172- // debug!(
173- // "number of actions before/after data skipping: {before_count} / {}",
174- // filtered_actions.num_rows()
175- // );
176213 }
177214}
178215
0 commit comments