diff --git a/ffi/src/scan.rs b/ffi/src/scan.rs index 86f5e7e5f..5d3b5047b 100644 --- a/ffi/src/scan.rs +++ b/ffi/src/scan.rs @@ -230,7 +230,7 @@ fn kernel_scan_data_next_impl( .data .lock() .map_err(|_| Error::generic("poisoned mutex"))?; - if let Some((data, sel_vec)) = data.next().transpose()? { + if let Some((data, sel_vec, _transforms)) = data.next().transpose()? { let bool_slice = KernelBoolSlice::from(sel_vec); (engine_visitor)(engine_context, data.into(), bool_slice); Ok(true) diff --git a/kernel/examples/inspect-table/src/main.rs b/kernel/examples/inspect-table/src/main.rs index 194530004..01b1c4e88 100644 --- a/kernel/examples/inspect-table/src/main.rs +++ b/kernel/examples/inspect-table/src/main.rs @@ -209,7 +209,7 @@ fn try_main() -> DeltaResult<()> { let scan = ScanBuilder::new(snapshot).build()?; let scan_data = scan.scan_data(&engine)?; for res in scan_data { - let (data, vector) = res?; + let (data, vector, _transforms) = res?; delta_kernel::scan::state::visit_scan_files( data.as_ref(), &vector, diff --git a/kernel/examples/read-table-multi-threaded/src/main.rs b/kernel/examples/read-table-multi-threaded/src/main.rs index e689a4ef4..57011dcc9 100644 --- a/kernel/examples/read-table-multi-threaded/src/main.rs +++ b/kernel/examples/read-table-multi-threaded/src/main.rs @@ -210,7 +210,7 @@ fn try_main() -> DeltaResult<()> { drop(record_batch_tx); for res in scan_data { - let (data, vector) = res?; + let (data, vector, _transforms) = res?; scan_file_tx = delta_kernel::scan::state::visit_scan_files( data.as_ref(), &vector, diff --git a/kernel/src/engine_data.rs b/kernel/src/engine_data.rs index 25a7e84bd..333ced827 100644 --- a/kernel/src/engine_data.rs +++ b/kernel/src/engine_data.rs @@ -129,7 +129,9 @@ pub trait TypedGetData<'a, T> { fn get_opt(&'a self, row_index: usize, field_name: &str) -> DeltaResult>; fn get(&'a self, row_index: usize, field_name: &str) -> DeltaResult { let val = self.get_opt(row_index, field_name)?; - val.ok_or_else(|| Error::MissingData(format!("Data missing for field {field_name}"))) + val.ok_or_else(|| { + Error::MissingData(format!("Data missing for field {field_name}")).with_backtrace() + }) } } diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index d7f83a4fa..33bc87075 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -1,15 +1,16 @@ use std::clone::Clone; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, LazyLock}; +use itertools::Itertools; use tracing::debug; use super::data_skipping::DataSkippingFilter; -use super::ScanData; +use super::{ScanData, Transform}; use crate::actions::get_log_add_schema; use crate::engine_data::{GetData, RowVisitor, TypedGetData as _}; use crate::expressions::{column_expr, column_name, ColumnName, Expression, ExpressionRef}; -use crate::scan::DeletionVectorDescriptor; +use crate::scan::{DeletionVectorDescriptor, TransformExpr}; use crate::schema::{ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType}; use crate::utils::require; use crate::{DeltaResult, Engine, EngineData, Error, ExpressionEvaluator}; @@ -44,12 +45,17 @@ struct LogReplayScanner { struct AddRemoveDedupVisitor<'seen> { seen: &'seen mut HashSet, selection_vector: Vec, + logical_schema: SchemaRef, + transform: Option>, + row_transform_exprs: Vec>, is_log_batch: bool, } impl AddRemoveDedupVisitor<'_> { /// Checks if log replay already processed this logical file (in which case the current action /// should be ignored). If not already seen, register it so we can recognize future duplicates. + /// Returns `true` if we have seen the file and should ignore it, `false` if we have not seen it + /// and should process it. fn check_and_record_seen(&mut self, key: FileActionKey) -> bool { // Note: each (add.path + add.dv_unique_id()) pair has a // unique Add + Remove pair in the log. For example: @@ -76,6 +82,37 @@ impl AddRemoveDedupVisitor<'_> { } } + /// Compute an expression that will transform from physical to logical for a given Add file action + fn get_transform_expr<'a>( + &self, + i: usize, + transform: &Transform, + getters: &[&'a dyn GetData<'a>], + ) -> DeltaResult { + let partition_values: HashMap<_, _> = getters[1].get(i, "add.partitionValues")?; + let transforms = transform + .iter() + .map(|transform_expr| match transform_expr { + TransformExpr::Partition(field_idx) => { + let field = self.logical_schema.fields.get_index(*field_idx); + let Some((_, field)) = field else { + return Err(Error::Generic( + format!("logical schema did not contain expected field at {field_idx}, can't transform data") + )); + }; + let name = field.physical_name(); + let partition_value = super::parse_partition_value( + partition_values.get(name), + field.data_type(), + )?; + Ok(partition_value.into()) + } + TransformExpr::Static(field_expr) => Ok(field_expr.clone()), + }) + .try_collect()?; + Ok(Arc::new(Expression::Struct(transforms))) + } + /// True if this row contains an Add action that should survive log replay. Skip it if the row /// is not an Add action, or the file has already been seen previously. fn is_valid_add<'a>(&mut self, i: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult { @@ -83,11 +120,11 @@ impl AddRemoveDedupVisitor<'_> { // have a remove with a path at index 4. In either case, extract the three dv getters at // indexes that immediately follow a valid path index. let (path, dv_getters, is_add) = if let Some(path) = getters[0].get_str(i, "add.path")? { - (path, &getters[1..4], true) + (path, &getters[2..5], true) } else if !self.is_log_batch { return Ok(false); - } else if let Some(path) = getters[4].get_opt(i, "remove.path")? { - (path, &getters[5..8], false) + } else if let Some(path) = getters[5].get_opt(i, "remove.path")? { + (path, &getters[6..9], false) } else { return Ok(false); }; @@ -101,9 +138,22 @@ impl AddRemoveDedupVisitor<'_> { None => None, }; - // Process both adds and removes, but only return not already-seen adds + // Check both adds and removes (skipping already-seen), but only transform and return adds let file_key = FileActionKey::new(path, dv_unique_id); - Ok(!self.check_and_record_seen(file_key) && is_add) + if self.check_and_record_seen(file_key) || !is_add { + return Ok(false); + } + let transform = self + .transform + .as_ref() + .map(|transform| self.get_transform_expr(i, transform, getters)) + .transpose()?; + if transform.is_some() { + // fill in any needed `None`s for previous rows + self.row_transform_exprs.resize_with(i, Default::default); + self.row_transform_exprs.push(transform); + } + Ok(true) } } @@ -113,8 +163,10 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> { static NAMES_AND_TYPES: LazyLock = LazyLock::new(|| { const STRING: DataType = DataType::STRING; const INTEGER: DataType = DataType::INTEGER; + let ss_map: DataType = MapType::new(STRING, STRING, true).into(); let types_and_names = vec![ (STRING, column_name!("add.path")), + (ss_map, column_name!("add.partitionValues")), (STRING, column_name!("add.deletionVector.storageType")), (STRING, column_name!("add.deletionVector.pathOrInlineDv")), (INTEGER, column_name!("add.deletionVector.offset")), @@ -132,12 +184,12 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> { } else { // All checkpoint actions are already reconciled and Remove actions in checkpoint files // only serve as tombstones for vacuum jobs. So we only need to examine the adds here. - (&names[..4], &types[..4]) + (&names[..5], &types[..5]) } } fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { - let expected_getters = if self.is_log_batch { 8 } else { 4 }; + let expected_getters = if self.is_log_batch { 9 } else { 5 }; require!( getters.len() == expected_getters, Error::InternalError(format!( @@ -207,6 +259,8 @@ impl LogReplayScanner { &mut self, add_transform: &dyn ExpressionEvaluator, actions: &dyn EngineData, + logical_schema: SchemaRef, + transform: Option>, is_log_batch: bool, ) -> DeltaResult { // Apply data skipping to get back a selection vector for actions that passed skipping. We @@ -220,6 +274,9 @@ impl LogReplayScanner { let mut visitor = AddRemoveDedupVisitor { seen: &mut self.seen, selection_vector, + logical_schema, + transform, + row_transform_exprs: Vec::new(), is_log_batch, }; visitor.visit_rows_of(actions)?; @@ -227,7 +284,7 @@ impl LogReplayScanner { // TODO: Teach expression eval to respect the selection vector we just computed so carefully! let selection_vector = visitor.selection_vector; let result = add_transform.evaluate(actions)?; - Ok((result, selection_vector)) + Ok((result, selection_vector, visitor.row_transform_exprs)) } } @@ -235,9 +292,11 @@ impl LogReplayScanner { /// `(engine_data, selection_vec)`. Each row that is selected in the returned `engine_data` _must_ /// be processed to complete the scan. Non-selected rows _must_ be ignored. The boolean flag /// indicates whether the record batch is a log or checkpoint batch. -pub fn scan_action_iter( +pub(crate) fn scan_action_iter( engine: &dyn Engine, action_iter: impl Iterator, bool)>>, + logical_schema: SchemaRef, + transform: Option>, physical_predicate: Option<(ExpressionRef, SchemaRef)>, ) -> impl Iterator> { let mut log_scanner = LogReplayScanner::new(engine, physical_predicate); @@ -249,20 +308,37 @@ pub fn scan_action_iter( action_iter .map(move |action_res| { let (batch, is_log_batch) = action_res?; - log_scanner.process_scan_batch(add_transform.as_ref(), batch.as_ref(), is_log_batch) + log_scanner.process_scan_batch( + add_transform.as_ref(), + batch.as_ref(), + logical_schema.clone(), + transform.clone(), + is_log_batch, + ) }) - .filter(|res| res.as_ref().map_or(true, |(_, sv)| sv.contains(&true))) + .filter(|res| res.as_ref().map_or(true, |(_, sv, _)| sv.contains(&true))) } #[cfg(test)] mod tests { - use std::collections::HashMap; + use std::{collections::HashMap, sync::Arc}; - use crate::scan::{ - state::{DvInfo, Stats}, - test_utils::{add_batch_simple, add_batch_with_remove, run_with_validate_callback}, + use crate::expressions::{column_name, Scalar}; + use crate::scan::state::{DvInfo, Stats}; + use crate::scan::test_utils::{ + add_batch_simple, add_batch_with_partition_col, add_batch_with_remove, + run_with_validate_callback, + }; + use crate::scan::{get_state_info, Scan}; + use crate::Expression; + use crate::{ + engine::sync::SyncEngine, + schema::{DataType, SchemaRef, StructField, StructType}, + ExpressionRef, }; + use super::scan_action_iter; + // dv-info is more complex to validate, we validate that works in the test for visit_scan_files // in state.rs fn validate_simple( @@ -288,6 +364,8 @@ mod tests { fn test_scan_action_iter() { run_with_validate_callback( vec![add_batch_simple()], + None, // not testing schema + None, // not testing transform &[true, false], (), validate_simple, @@ -298,9 +376,80 @@ mod tests { fn test_scan_action_iter_with_remove() { run_with_validate_callback( vec![add_batch_with_remove()], + None, // not testing schema + None, // not testing transform &[false, false, true, false], (), validate_simple, ); } + + #[test] + fn test_no_transforms() { + let batch = vec![add_batch_simple()]; + let logical_schema = Arc::new(crate::schema::StructType::new(vec![])); + let iter = scan_action_iter( + &SyncEngine::new(), + batch.into_iter().map(|batch| Ok((batch as _, true))), + logical_schema, + None, + None, + ); + for res in iter { + let (_batch, _sel, transforms) = res.unwrap(); + assert!(transforms.is_empty(), "Should have no transforms"); + } + } + + #[test] + fn test_simple_transform() { + let schema: SchemaRef = Arc::new(StructType::new([ + StructField::new("value", DataType::INTEGER, true), + StructField::new("date", DataType::DATE, true), + ])); + let partition_cols = ["date".to_string()]; + let state_info = get_state_info(schema.as_ref(), &partition_cols).unwrap(); + let static_transform = Some(Arc::new(Scan::get_static_transform(&state_info.all_fields))); + let batch = vec![add_batch_with_partition_col()]; + let iter = scan_action_iter( + &SyncEngine::new(), + batch.into_iter().map(|batch| Ok((batch as _, true))), + schema, + static_transform, + None, + ); + + fn validate_transform(transform: Option<&ExpressionRef>, expected_date_offset: i32) { + assert!(transform.is_some()); + let Expression::Struct(inner) = transform.unwrap().as_ref() else { + panic!("Transform should always be a struct expr"); + }; + assert_eq!(inner.len(), 2, "expected two items in transform struct"); + + let Expression::Column(ref name) = inner[0] else { + panic!("Expected first expression to be a column"); + }; + assert_eq!(name, &column_name!("value"), "First col should be 'value'"); + + let Expression::Literal(ref scalar) = inner[1] else { + panic!("Expected second expression to be a literal"); + }; + assert_eq!( + scalar, + &Scalar::Date(expected_date_offset), + "Didn't get expected date offset" + ); + } + + for res in iter { + let (_batch, _sel, transforms) = res.unwrap(); + // in this case we have a metadata action first and protocol 3rd, so we expect 4 items, + // the first and 3rd being a `None` + assert_eq!(transforms.len(), 4, "Should have 4 transforms"); + assert!(transforms[0].is_none(), "transform at [0] should be None"); + assert!(transforms[2].is_none(), "transform at [2] should be None"); + validate_transform(transforms[1].as_ref(), 17511); + validate_transform(transforms[3].as_ref(), 17510); + } + } } diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 53c6d4cae..418e289fb 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -301,7 +301,20 @@ pub enum ColumnType { Partition(usize), } -pub type ScanData = (Box, Vec); +/// A transform is ultimately a `Struct` expr. This holds the set of expressions that make that struct expr up +type Transform = Vec; + +/// Transforms aren't computed all at once. So static ones can just go straight to `Expression`, but +/// things like partition columns need to filled in. This enum holds an expression that's part of a +/// `Transform`. +pub(crate) enum TransformExpr { + Static(Expression), + Partition(usize), +} + +// TODO(nick): Make this a struct in a follow-on PR +// (data, deletion_vec, transforms) +pub type ScanData = (Box, Vec, Vec>); /// The result of building a scan over a table. This can be used to get the actual data from /// scanning the table. @@ -340,6 +353,21 @@ impl Scan { } } + /// Convert the parts of the transform that can be computed statically into `Expression`s. For + /// parts that cannot be computed statically, include enough metadata so lower levels of + /// processing can create and fill in an expression. + fn get_static_transform(all_fields: &[ColumnType]) -> Transform { + all_fields + .iter() + .map(|field| match field { + ColumnType::Selected(col_name) => { + TransformExpr::Static(ColumnName::new([col_name]).into()) + } + ColumnType::Partition(idx) => TransformExpr::Partition(*idx), + }) + .collect() + } + /// Get an iterator of [`EngineData`]s that should be included in scan for a query. This handles /// log-replay, reconciling Add and Remove actions, and applying data skipping (if /// possible). Each item in the returned iterator is a tuple of: @@ -352,11 +380,23 @@ impl Scan { /// the query. NB: If you are using the default engine and plan to call arrow's /// `filter_record_batch`, you _need_ to extend this vector to the full length of the batch or /// arrow will drop the extra rows. + /// - `Vec>`: Transformation expressions that need to be applied. For each + /// row at index `i` in the above data, if an expression exists at index `i` in the `Vec`, + /// the associated expression _must_ be applied to the data read from the file specified by + /// the row. The resultant schema for this expression is guaranteed to be `Scan.schema()`. If + /// the item at index `i` in this `Vec` is `None`, or if the `Vec` contains fewer than `i` + /// elements, no expression need be applied and the data read from disk is already in the + /// correct logical state. pub fn scan_data( &self, engine: &dyn Engine, ) -> DeltaResult>> { - // NOTE: This is a cheap arc clone + // Compute the static part of the transformation. This is `None` if no transformation is + // needed (currently just means no partition cols AND no column mapping but will be extended + // for other transforms as we support them) + let static_transform = (self.have_partition_cols + || self.snapshot.column_mapping_mode != ColumnMappingMode::None) + .then_some(Arc::new(Scan::get_static_transform(&self.all_fields))); let physical_predicate = match self.physical_predicate.clone() { PhysicalPredicate::StaticSkipAll => return Ok(None.into_iter().flatten()), PhysicalPredicate::Some(predicate, schema) => Some((predicate, schema)), @@ -365,6 +405,8 @@ impl Scan { let it = scan_action_iter( engine, self.replay_for_scan_data(engine)?, + self.logical_schema.clone(), + static_transform, physical_predicate, ); Ok(Some(it).into_iter().flatten()) @@ -445,7 +487,7 @@ impl Scan { let scan_data = self.scan_data(engine.as_ref())?; let scan_files_iter = scan_data .map(|res| { - let (data, vec) = res?; + let (data, vec, _transforms) = res?; let scan_files = vec![]; state::visit_scan_files(data.as_ref(), &vec, scan_files, scan_data_callback) }) @@ -688,10 +730,11 @@ pub(crate) mod test_utils { sync::{json::SyncJsonHandler, SyncEngine}, }, scan::log_replay::scan_action_iter, + schema::SchemaRef, EngineData, JsonHandler, }; - use super::state::ScanCallback; + use super::{state::ScanCallback, Transform}; // TODO(nick): Merge all copies of this into one "test utils" thing fn string_array_to_engine_data(string_array: StringArray) -> Box { @@ -734,21 +777,46 @@ pub(crate) mod test_utils { ArrowEngineData::try_from_engine_data(parsed).unwrap() } + // add batch with a `date` partition col + pub(crate) fn add_batch_with_partition_col() -> Box { + let handler = SyncJsonHandler {}; + let json_strings: StringArray = vec![ + r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["date"],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#, + r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c001.snappy.parquet","partitionValues": {"date": "2017-12-11"},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#, + r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#, + r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues": {"date": "2017-12-10"},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, + ] + .into(); + let output_schema = get_log_schema().clone(); + let parsed = handler + .parse_json(string_array_to_engine_data(json_strings), output_schema) + .unwrap(); + ArrowEngineData::try_from_engine_data(parsed).unwrap() + } + + /// Create a scan action iter and validate what's called back. If you pass `None` as + /// `logical_schema`, `transform` should also be `None` #[allow(clippy::vec_box)] pub(crate) fn run_with_validate_callback( batch: Vec>, + logical_schema: Option, + transform: Option>, expected_sel_vec: &[bool], context: T, validate_callback: ScanCallback, ) { + let logical_schema = + logical_schema.unwrap_or_else(|| Arc::new(crate::schema::StructType::new(vec![]))); let iter = scan_action_iter( &SyncEngine::new(), batch.into_iter().map(|batch| Ok((batch as _, true))), + logical_schema, + transform, None, ); let mut batch_count = 0; for res in iter { - let (batch, sel) = res.unwrap(); + let (batch, sel, _transforms) = res.unwrap(); assert_eq!(sel, expected_sel_vec); crate::scan::state::visit_scan_files( batch.as_ref(), @@ -959,7 +1027,7 @@ mod tests { } let mut files = vec![]; for data in scan_data { - let (data, vec) = data?; + let (data, vec, _transforms) = data?; files = state::visit_scan_files(data.as_ref(), &vec, files, scan_data_callback)?; } Ok(files) diff --git a/kernel/src/scan/state.rs b/kernel/src/scan/state.rs index b57f0c120..085af15ec 100644 --- a/kernel/src/scan/state.rs +++ b/kernel/src/scan/state.rs @@ -250,6 +250,8 @@ mod tests { let context = TestContext { id: 2 }; run_with_validate_callback( vec![add_batch_simple()], + None, // not testing schema + None, // not testing transform &[true, false], context, validate_visit, diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 46a72d309..0ef3c6f1b 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -369,7 +369,7 @@ fn read_with_scan_data( let scan_data = scan.scan_data(engine)?; let mut scan_files = vec![]; for data in scan_data { - let (data, vec) = data?; + let (data, vec, _transforms) = data?; scan_files = visit_scan_files(data.as_ref(), &vec, scan_files, scan_data_callback)?; }