-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Part 1, Read transforms via expressions: Just compute the expression and return it. #607
base: main
Are you sure you want to change the base?
Changes from all commits
6e0b1c9
c53b7de
9ac7173
f75b2e3
4d1d4f7
c8cc84b
29ded0e
9d4688c
631f403
f791167
b7268e5
da5a9e8
e3fdfaa
e9a8d1c
b773614
ebcb42d
3a38785
58ad2a3
3d040f7
999143d
de5c18a
d5e55a4
a97f605
1ea159c
f6b81ac
7254704
fe68f0a
0ea983d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,16 @@ | ||
use std::clone::Clone; | ||
use std::collections::HashSet; | ||
use std::collections::{HashMap, HashSet}; | ||
use std::sync::{Arc, LazyLock}; | ||
|
||
use itertools::Itertools; | ||
use tracing::debug; | ||
|
||
use super::data_skipping::DataSkippingFilter; | ||
use super::ScanData; | ||
use super::{ScanData, Transform}; | ||
use crate::actions::get_log_add_schema; | ||
use crate::engine_data::{GetData, RowVisitor, TypedGetData as _}; | ||
use crate::expressions::{column_expr, column_name, ColumnName, Expression, ExpressionRef}; | ||
use crate::scan::DeletionVectorDescriptor; | ||
use crate::scan::{DeletionVectorDescriptor, TransformExpr}; | ||
use crate::schema::{ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType}; | ||
use crate::utils::require; | ||
use crate::{DeltaResult, Engine, EngineData, Error, ExpressionEvaluator}; | ||
|
@@ -44,12 +45,17 @@ struct LogReplayScanner { | |
struct AddRemoveDedupVisitor<'seen> { | ||
seen: &'seen mut HashSet<FileActionKey>, | ||
selection_vector: Vec<bool>, | ||
logical_schema: SchemaRef, | ||
transform: Option<Arc<Transform>>, | ||
OussamaSaoudi-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
row_transform_exprs: Vec<Option<ExpressionRef>>, | ||
is_log_batch: bool, | ||
} | ||
|
||
impl AddRemoveDedupVisitor<'_> { | ||
/// Checks if log replay already processed this logical file (in which case the current action | ||
/// should be ignored). If not already seen, register it so we can recognize future duplicates. | ||
/// Returns `true` if we have seen the file and should ignore it, `false` if we have not seen it | ||
/// and should process it. | ||
fn check_and_record_seen(&mut self, key: FileActionKey) -> bool { | ||
// Note: each (add.path + add.dv_unique_id()) pair has a | ||
// unique Add + Remove pair in the log. For example: | ||
|
@@ -76,18 +82,49 @@ impl AddRemoveDedupVisitor<'_> { | |
} | ||
} | ||
|
||
/// Compute an expression that will transform from physical to logical for a given Add file action | ||
fn get_transform_expr<'a>( | ||
&self, | ||
i: usize, | ||
transform: &Transform, | ||
getters: &[&'a dyn GetData<'a>], | ||
) -> DeltaResult<ExpressionRef> { | ||
let partition_values: HashMap<_, _> = getters[1].get(i, "add.partitionValues")?; | ||
let transforms = transform | ||
.iter() | ||
.map(|transform_expr| match transform_expr { | ||
TransformExpr::Partition(field_idx) => { | ||
let field = self.logical_schema.fields.get_index(*field_idx); | ||
let Some((_, field)) = field else { | ||
return Err(Error::generic( | ||
"logical schema did not contain expected field, can't transform data", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. mega nit but might be useful to log the index? |
||
)); | ||
}; | ||
let name = field.physical_name(); | ||
let partition_value = super::parse_partition_value( | ||
partition_values.get(name), | ||
field.data_type(), | ||
)?; | ||
Ok(partition_value.into()) | ||
} | ||
TransformExpr::Static(field_expr) => Ok(field_expr.clone()), | ||
}) | ||
.try_collect()?; | ||
Ok(Arc::new(Expression::Struct(transforms))) | ||
} | ||
|
||
/// True if this row contains an Add action that should survive log replay. Skip it if the row | ||
/// is not an Add action, or the file has already been seen previously. | ||
fn is_valid_add<'a>(&mut self, i: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<bool> { | ||
// Add will have a path at index 0 if it is valid; otherwise, if it is a log batch, we may | ||
// have a remove with a path at index 4. In either case, extract the three dv getters at | ||
// indexes that immediately follow a valid path index. | ||
let (path, dv_getters, is_add) = if let Some(path) = getters[0].get_str(i, "add.path")? { | ||
(path, &getters[1..4], true) | ||
(path, &getters[2..5], true) | ||
} else if !self.is_log_batch { | ||
return Ok(false); | ||
} else if let Some(path) = getters[4].get_opt(i, "remove.path")? { | ||
(path, &getters[5..8], false) | ||
} else if let Some(path) = getters[5].get_opt(i, "remove.path")? { | ||
(path, &getters[6..9], false) | ||
} else { | ||
return Ok(false); | ||
}; | ||
|
@@ -101,9 +138,22 @@ impl AddRemoveDedupVisitor<'_> { | |
None => None, | ||
}; | ||
|
||
// Process both adds and removes, but only return not already-seen adds | ||
// Check both adds and removes (skipping already-seen), but only transform and return adds | ||
let file_key = FileActionKey::new(path, dv_unique_id); | ||
Ok(!self.check_and_record_seen(file_key) && is_add) | ||
if self.check_and_record_seen(file_key) || !is_add { | ||
return Ok(false); | ||
} | ||
let transform = self | ||
.transform | ||
.as_ref() | ||
.map(|transform| self.get_transform_expr(i, transform, getters)) | ||
.transpose()?; | ||
if transform.is_some() { | ||
// fill in any needed `None`s for previous rows | ||
self.row_transform_exprs.resize_with(i, Default::default); | ||
self.row_transform_exprs.push(transform); | ||
} | ||
Ok(true) | ||
} | ||
} | ||
|
||
|
@@ -113,8 +163,10 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> { | |
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| { | ||
const STRING: DataType = DataType::STRING; | ||
const INTEGER: DataType = DataType::INTEGER; | ||
let ss_map: DataType = MapType::new(STRING, STRING, true).into(); | ||
let types_and_names = vec![ | ||
(STRING, column_name!("add.path")), | ||
(ss_map, column_name!("add.partitionValues")), | ||
(STRING, column_name!("add.deletionVector.storageType")), | ||
(STRING, column_name!("add.deletionVector.pathOrInlineDv")), | ||
(INTEGER, column_name!("add.deletionVector.offset")), | ||
|
@@ -132,12 +184,12 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> { | |
} else { | ||
// All checkpoint actions are already reconciled and Remove actions in checkpoint files | ||
// only serve as tombstones for vacuum jobs. So we only need to examine the adds here. | ||
(&names[..4], &types[..4]) | ||
(&names[..5], &types[..5]) | ||
} | ||
} | ||
|
||
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { | ||
let expected_getters = if self.is_log_batch { 8 } else { 4 }; | ||
let expected_getters = if self.is_log_batch { 9 } else { 5 }; | ||
require!( | ||
getters.len() == expected_getters, | ||
Error::InternalError(format!( | ||
|
@@ -207,6 +259,8 @@ impl LogReplayScanner { | |
&mut self, | ||
add_transform: &dyn ExpressionEvaluator, | ||
actions: &dyn EngineData, | ||
logical_schema: SchemaRef, | ||
transform: Option<Arc<Transform>>, | ||
is_log_batch: bool, | ||
) -> DeltaResult<ScanData> { | ||
// Apply data skipping to get back a selection vector for actions that passed skipping. We | ||
|
@@ -220,24 +274,29 @@ impl LogReplayScanner { | |
let mut visitor = AddRemoveDedupVisitor { | ||
seen: &mut self.seen, | ||
selection_vector, | ||
logical_schema, | ||
transform, | ||
row_transform_exprs: Vec::new(), | ||
is_log_batch, | ||
}; | ||
visitor.visit_rows_of(actions)?; | ||
|
||
// TODO: Teach expression eval to respect the selection vector we just computed so carefully! | ||
let selection_vector = visitor.selection_vector; | ||
let result = add_transform.evaluate(actions)?; | ||
Ok((result, selection_vector)) | ||
Ok((result, selection_vector, visitor.row_transform_exprs)) | ||
} | ||
} | ||
|
||
/// Given an iterator of (engine_data, bool) tuples and a predicate, returns an iterator of | ||
/// `(engine_data, selection_vec)`. Each row that is selected in the returned `engine_data` _must_ | ||
/// be processed to complete the scan. Non-selected rows _must_ be ignored. The boolean flag | ||
/// indicates whether the record batch is a log or checkpoint batch. | ||
pub fn scan_action_iter( | ||
pub(crate) fn scan_action_iter( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note this is a significant change as we not longer expose this function. In discussion so far we've agreed that it basically should never have been Open to discussion though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
engine: &dyn Engine, | ||
action_iter: impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>>, | ||
logical_schema: SchemaRef, | ||
transform: Option<Arc<Transform>>, | ||
physical_predicate: Option<(ExpressionRef, SchemaRef)>, | ||
) -> impl Iterator<Item = DeltaResult<ScanData>> { | ||
let mut log_scanner = LogReplayScanner::new(engine, physical_predicate); | ||
|
@@ -249,20 +308,37 @@ pub fn scan_action_iter( | |
action_iter | ||
.map(move |action_res| { | ||
let (batch, is_log_batch) = action_res?; | ||
log_scanner.process_scan_batch(add_transform.as_ref(), batch.as_ref(), is_log_batch) | ||
log_scanner.process_scan_batch( | ||
add_transform.as_ref(), | ||
batch.as_ref(), | ||
logical_schema.clone(), | ||
transform.clone(), | ||
is_log_batch, | ||
) | ||
}) | ||
.filter(|res| res.as_ref().map_or(true, |(_, sv)| sv.contains(&true))) | ||
.filter(|res| res.as_ref().map_or(true, |(_, sv, _)| sv.contains(&true))) | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::collections::HashMap; | ||
use std::{collections::HashMap, sync::Arc}; | ||
|
||
use crate::scan::{ | ||
state::{DvInfo, Stats}, | ||
test_utils::{add_batch_simple, add_batch_with_remove, run_with_validate_callback}, | ||
use crate::expressions::{column_name, Scalar}; | ||
use crate::scan::state::{DvInfo, Stats}; | ||
use crate::scan::test_utils::{ | ||
add_batch_simple, add_batch_with_partition_col, add_batch_with_remove, | ||
run_with_validate_callback, | ||
}; | ||
use crate::scan::{get_state_info, Scan}; | ||
use crate::Expression; | ||
use crate::{ | ||
engine::sync::SyncEngine, | ||
schema::{DataType, SchemaRef, StructField, StructType}, | ||
ExpressionRef, | ||
}; | ||
|
||
use super::scan_action_iter; | ||
|
||
// dv-info is more complex to validate, we validate that works in the test for visit_scan_files | ||
// in state.rs | ||
fn validate_simple( | ||
|
@@ -288,6 +364,8 @@ mod tests { | |
fn test_scan_action_iter() { | ||
run_with_validate_callback( | ||
vec![add_batch_simple()], | ||
None, // not testing schema | ||
None, // not testing transform | ||
&[true, false], | ||
(), | ||
validate_simple, | ||
|
@@ -298,9 +376,80 @@ mod tests { | |
fn test_scan_action_iter_with_remove() { | ||
run_with_validate_callback( | ||
vec![add_batch_with_remove()], | ||
None, // not testing schema | ||
None, // not testing transform | ||
&[false, false, true, false], | ||
(), | ||
validate_simple, | ||
); | ||
} | ||
|
||
#[test] | ||
fn test_no_transforms() { | ||
let batch = vec![add_batch_simple()]; | ||
let logical_schema = Arc::new(crate::schema::StructType::new(vec![])); | ||
let iter = scan_action_iter( | ||
&SyncEngine::new(), | ||
batch.into_iter().map(|batch| Ok((batch as _, true))), | ||
logical_schema, | ||
None, | ||
None, | ||
); | ||
for res in iter { | ||
let (_batch, _sel, transforms) = res.unwrap(); | ||
assert!(transforms.is_empty(), "Should have no transforms"); | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_simple_transform() { | ||
let schema: SchemaRef = Arc::new(StructType::new([ | ||
StructField::new("value", DataType::INTEGER, true), | ||
StructField::new("date", DataType::DATE, true), | ||
])); | ||
let partition_cols = ["date".to_string()]; | ||
let state_info = get_state_info(schema.as_ref(), &partition_cols).unwrap(); | ||
let static_transform = Some(Arc::new(Scan::get_static_transform(&state_info.all_fields))); | ||
let batch = vec![add_batch_with_partition_col()]; | ||
let iter = scan_action_iter( | ||
&SyncEngine::new(), | ||
batch.into_iter().map(|batch| Ok((batch as _, true))), | ||
schema, | ||
static_transform, | ||
None, | ||
); | ||
|
||
fn validate_transform(transform: Option<&ExpressionRef>, expected_date_offset: i32) { | ||
assert!(transform.is_some()); | ||
let Expression::Struct(inner) = transform.unwrap().as_ref() else { | ||
panic!("Transform should always be a struct expr"); | ||
}; | ||
assert_eq!(inner.len(), 2, "expected two items in transform struct"); | ||
|
||
let Expression::Column(ref name) = inner[0] else { | ||
panic!("Expected first expression to be a column"); | ||
}; | ||
assert_eq!(name, &column_name!("value"), "First col should be 'value'"); | ||
|
||
let Expression::Literal(ref scalar) = inner[1] else { | ||
panic!("Expected second expression to be a literal"); | ||
}; | ||
assert_eq!( | ||
scalar, | ||
&Scalar::Date(expected_date_offset), | ||
"Didn't get expected date offset" | ||
); | ||
} | ||
|
||
for res in iter { | ||
let (_batch, _sel, transforms) = res.unwrap(); | ||
// in this case we have a metadata action first and protocol 3rd, so we expect 4 items, | ||
// the first and 3rd being a `None` | ||
assert_eq!(transforms.len(), 4, "Should have 4 transforms"); | ||
assert!(transforms[0].is_none(), "transform at [0] should be None"); | ||
assert!(transforms[2].is_none(), "transform at [2] should be None"); | ||
validate_transform(transforms[1].as_ref(), 17511); | ||
validate_transform(transforms[3].as_ref(), 17510); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
intentional/permanent change? Or just for debugging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
intentional, since this error occurs in more than one place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aside: I wonder if we should start adding some kind of "location code" as a (much) cheaper alternative to backtraces, that also stays stable as the code base evolves around it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that could work. I'm not too worried about perf for backtraces as they should only appear in error cases though