diff --git a/kernel/src/predicates/mod.rs b/kernel/src/predicates/mod.rs index e47da293f8..c2d870445f 100644 --- a/kernel/src/predicates/mod.rs +++ b/kernel/src/predicates/mod.rs @@ -534,8 +534,6 @@ impl ResolveColumnAsScalar for EmptyColumnResolver { } } -// In testing, it is convenient to just build a hashmap of scalar values. -#[cfg(test)] impl ResolveColumnAsScalar for std::collections::HashMap { fn resolve_column(&self, col: &ColumnName) -> Option { self.get(col).cloned() diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index 33bc87075d..9bf96c0307 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -6,6 +6,7 @@ use itertools::Itertools; use tracing::debug; use super::data_skipping::DataSkippingFilter; +use super::partition_skipping::PartitionSkippingFilter; use super::{ScanData, Transform}; use crate::actions::get_log_add_schema; use crate::engine_data::{GetData, RowVisitor, TypedGetData as _}; @@ -30,7 +31,11 @@ impl FileActionKey { } struct LogReplayScanner { - filter: Option, + /// Filter based on partition values + partition_filter: Option, + + /// Filter based on min/max values in the statistics + data_filter: Option, /// A set of (data file path, dv_unique_id) pairs that have been seen thus /// far in the log. This is used to filter out files with Remove actions as @@ -248,9 +253,18 @@ fn get_add_transform_expr() -> Expression { impl LogReplayScanner { /// Create a new [`LogReplayScanner`] instance - fn new(engine: &dyn Engine, physical_predicate: Option<(ExpressionRef, SchemaRef)>) -> Self { + fn new( + engine: &dyn Engine, + physical_predicate: Option<(ExpressionRef, SchemaRef)>, + partition_columns: &[String], + ) -> Self { Self { - filter: DataSkippingFilter::new(engine, physical_predicate), + partition_filter: PartitionSkippingFilter::new( + engine, + physical_predicate.clone(), + partition_columns, + ), + data_filter: DataSkippingFilter::new(engine, physical_predicate), seen: Default::default(), } } @@ -265,11 +279,31 @@ impl LogReplayScanner { ) -> DeltaResult { // Apply data skipping to get back a selection vector for actions that passed skipping. We // will update the vector below as log replay identifies duplicates that should be ignored. - let selection_vector = match &self.filter { + let data_selection_vector = match &self.data_filter { Some(filter) => filter.apply(actions)?, None => vec![true; actions.len()], }; - assert_eq!(selection_vector.len(), actions.len()); + if data_selection_vector.len() != actions.len() { + return Err(crate::Error::internal_error( + "Data skipping filter returned incorrect number of rows", + )); + } + + let partition_selection_vector = match &self.partition_filter { + Some(filter) => filter.apply(actions)?, + None => vec![true; actions.len()], + }; + if partition_selection_vector.len() != actions.len() { + return Err(crate::Error::internal_error( + "Partition skipping filter returned incorrect number of rows", + )); + } + + let selection_vector = data_selection_vector + .iter() + .zip(partition_selection_vector.iter()) + .map(|(a, b)| *a && *b) + .collect(); let mut visitor = AddRemoveDedupVisitor { seen: &mut self.seen, @@ -298,8 +332,9 @@ pub(crate) fn scan_action_iter( logical_schema: SchemaRef, transform: Option>, physical_predicate: Option<(ExpressionRef, SchemaRef)>, + partition_columns: &[String], ) -> impl Iterator> { - let mut log_scanner = LogReplayScanner::new(engine, physical_predicate); + let mut log_scanner = LogReplayScanner::new(engine, physical_predicate, partition_columns); let add_transform = engine.get_expression_handler().get_evaluator( get_log_add_schema().clone(), get_add_transform_expr(), @@ -394,6 +429,7 @@ mod tests { logical_schema, None, None, + &[], ); for res in iter { let (_batch, _sel, transforms) = res.unwrap(); @@ -417,6 +453,7 @@ mod tests { schema, static_transform, None, + &[], ); fn validate_transform(transform: Option<&ExpressionRef>, expected_date_offset: i32) { diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 418e289fbe..c9dff8f809 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -28,6 +28,7 @@ use self::state::GlobalScanState; pub(crate) mod data_skipping; pub mod log_replay; +pub(crate) mod partition_skipping; pub mod state; /// Builder to scan a snapshot of a table. @@ -408,6 +409,7 @@ impl Scan { self.logical_schema.clone(), static_transform, physical_predicate, + &self.snapshot.metadata().partition_columns, ); Ok(Some(it).into_iter().flatten()) } @@ -813,6 +815,7 @@ pub(crate) mod test_utils { logical_schema, transform, None, + &[], ); let mut batch_count = 0; for res in iter { diff --git a/kernel/src/scan/partition_skipping.rs b/kernel/src/scan/partition_skipping.rs new file mode 100644 index 0000000000..ad6e5ff522 --- /dev/null +++ b/kernel/src/scan/partition_skipping.rs @@ -0,0 +1,224 @@ +use std::{ + collections::HashMap, + sync::{Arc, LazyLock}, +}; + +use tracing::debug; + +use crate::schema::column_name; +use crate::{ + engine_data::GetData, + expressions::Scalar, + predicates::{DefaultPredicateEvaluator, PredicateEvaluator}, + scan::get_log_add_schema, + schema::{ColumnName, DataType, MapType, SchemaRef}, + DeltaResult, Engine, EngineData, Expression, ExpressionEvaluator, ExpressionRef, RowVisitor, +}; +use crate::{expressions::column_expr, schema::StructType}; + +pub(crate) struct PartitionSkippingFilter { + evaluator: Arc, + predicate: Arc, + schema: SchemaRef, +} + +impl PartitionSkippingFilter { + pub(crate) fn new( + engine: &dyn Engine, + physical_predicate: Option<(ExpressionRef, SchemaRef)>, + partition_columns: &[String], + ) -> Option { + static PARITIONS_EXPR: LazyLock = + LazyLock::new(|| column_expr!("add.partitionValues")); + + let (predicate, schema) = physical_predicate?; + debug!("Creating a partition skipping filter for {:#?}", predicate); + + // Limit the schema passed to the row visitor of only the fields that are included + // in the predicate and are also partition columns. The data skipping columns will + // be handled elsewhere. + let mut partition_fields = schema + .fields() + .filter(|f| partition_columns.contains(f.name())) + .cloned() + .peekable(); + partition_fields.peek()?; + let schema = Arc::new(StructType::new(partition_fields)); + + let partitions_map_type = MapType::new(DataType::STRING, DataType::STRING, true); + + let evaluator = engine.get_expression_handler().get_evaluator( + get_log_add_schema().clone(), + PARITIONS_EXPR.clone(), + partitions_map_type.into(), + ); + + Some(Self { + evaluator, + predicate, + schema, + }) + } + + pub(crate) fn apply(&self, actions: &dyn EngineData) -> DeltaResult> { + let partitions = self.evaluator.evaluate(actions)?; + assert_eq!(partitions.len(), actions.len()); + + let mut visitor = PartitionVisitor::new(&self.predicate, &self.schema); + visitor.visit_rows_of(partitions.as_ref())?; + Ok(visitor.selection_vector.clone()) + } +} + +struct PartitionVisitor { + pub(crate) selection_vector: Vec, + predicate: Arc, + schema: SchemaRef, +} + +impl PartitionVisitor { + pub(crate) fn new(predicate: &Arc, schema: &SchemaRef) -> Self { + Self { + selection_vector: Vec::default(), + predicate: Arc::clone(predicate), + schema: Arc::clone(schema), + } + } +} + +impl RowVisitor for PartitionVisitor { + fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) { + static NAMES_AND_TYPES: LazyLock = + LazyLock::new(|| { + ( + vec![column_name!("output")], + vec![DataType::Map(Box::new(MapType::new( + DataType::STRING, + DataType::STRING, + true, + )))], + ) + .into() + }); + NAMES_AND_TYPES.as_ref() + } + + fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { + let getter = getters[0]; + for i in 0..row_count { + let val: Option> = getter.get_map(i, "output")?.map(|m| { + let partition_values = m.materialize(); + let resolver = self.schema.fields() + .map(|field| { + let data_type = field.data_type(); + + let DataType::Primitive(primitive_type) = data_type else { + return Err(crate::Error::unsupported( + format!("Partition filtering only supported for primitive types. Found type {} for field {}", data_type, field.name()) + )); + }; + + let scalar = partition_values + .get(field.name()) + .map(|v| primitive_type.parse_scalar(v)) + .transpose()? + .unwrap_or( + match field.nullable { + true => Ok(Scalar::Null(data_type.clone())), + false => Err(crate::Error::missing_data(format!("Missing partition values on a non-nullable field is not supported. Field {}", field.name))) + }?); + + Ok((ColumnName::new([field.name()]), scalar)) + }) + .collect::>>()?; + + let filter = DefaultPredicateEvaluator::from(resolver); + Ok(filter.eval_expr(&self.predicate, false).unwrap_or(true)) + }); + + self.selection_vector.push(val.transpose()?.unwrap_or(true)); + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use arrow_array::{RecordBatch, StringArray}; + use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; + use std::sync::Arc; + + use crate::engine::{arrow_data::ArrowEngineData, sync::SyncEngine}; + use crate::expressions::UnaryOperator; + use crate::scan::get_log_schema; + use crate::schema::{DataType, Schema, StructField}; + use crate::{DeltaResult, Engine, EngineData, Expression}; + + use super::PartitionSkippingFilter; + + // TODO(nick): Merge all copies of this into one "test utils" thing + fn string_array_to_engine_data(string_array: StringArray) -> Box { + let string_field = Arc::new(ArrowField::new("a", ArrowDataType::Utf8, true)); + let schema = Arc::new(ArrowSchema::new(vec![string_field])); + let batch = RecordBatch::try_new(schema, vec![Arc::new(string_array)]) + .expect("Can't convert to record batch"); + Box::new(ArrowEngineData::new(batch)) + } + + #[test] + fn test_partition_skipping() -> DeltaResult<()> { + let engine = SyncEngine::new(); + let json_handler = engine.get_json_handler(); + let json_strings: StringArray = vec![ + // All these values should be filtered due to c1 value + r#"{"add":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","partitionValues":{"c1":"1","c2":""},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#, + r#"{"add":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","partitionValues":{"c1":"2","c2":null},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#, + r#"{"add":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","partitionValues":{"c1":"3","c2":"a"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#, + + // Test both null and "" produce valid nulls + r#"{"add":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","partitionValues":{"c1":"4","c2":""},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#, + r#"{"add":{"path":"c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet","partitionValues":{"c1":"5","c2":null},"size":452,"modificationTime":1670892998136,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":6},\"maxValues\":{\"c3\":6},\"nullCount\":{\"c3\":0}}"}}"#, + r#"{"add":{"path":"c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet","partitionValues":{"c1":"6","c2":"b"},"size":452,"modificationTime":1670892998137,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":4},\"maxValues\":{\"c3\":4},\"nullCount\":{\"c3\":0}}"}}"#, + + // Gracefully handle missing partition values as null + r#"{"add":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","partitionValues":{"c1":"1"},"size":452,"modificationTime":1670892998135,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":5},\"maxValues\":{\"c3\":5},\"nullCount\":{\"c3\":0}}"}}"#, + r#"{"add":{"path":"c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet","partitionValues":{"c1":"5"},"size":452,"modificationTime":1670892998136,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c3\":6},\"maxValues\":{\"c3\":6},\"nullCount\":{\"c3\":0}}"}}"#, + + ] + .into(); + + let log_schema = get_log_schema().clone(); + let batch = json_handler + .parse_json( + string_array_to_engine_data(json_strings), + log_schema.clone(), + ) + .unwrap(); + + let expr = Arc::new(Expression::and( + Expression::unary(UnaryOperator::IsNull, Expression::column(["c2"])), + Expression::ge(Expression::column(["c1"]), Expression::literal(4)), + )); + + let schema = Arc::new(Schema::new(vec![ + StructField::new("c1", DataType::INTEGER, true), + StructField::new("c2", DataType::STRING, true), + StructField::new("c3", DataType::INTEGER, true), + ])); + + let physical_predicate = Some((expr, schema)); + let filter = PartitionSkippingFilter::new( + &engine, + physical_predicate, + &["c1".to_string(), "c2".to_string()], + ) + .expect("Unable to create Partition Skipping Filter"); + + let actual = filter.apply(batch.as_ref())?; + + let expected = vec![false, false, false, true, true, false, false, true]; + + assert_eq!(actual, expected); + Ok(()) + } +}