Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Part 1, Read transforms via expressions: Just compute the expression and return it. #607

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6e0b1c9
also extract partitionValues
nicklan Dec 9, 2024
c53b7de
checkpoint
nicklan Dec 11, 2024
9ac7173
Merge branch 'main' into transform-expr
nicklan Dec 12, 2024
f75b2e3
hey, it kinda works
nicklan Dec 12, 2024
4d1d4f7
Merge branch 'main' into transform-expr
nicklan Dec 17, 2024
c8cc84b
undo change to ColumnType, will go a different direction
nicklan Dec 17, 2024
29ded0e
use TransformExpr
nicklan Dec 18, 2024
9d4688c
cleanup
nicklan Dec 18, 2024
631f403
Merge branch 'main' into transform-expr
nicklan Dec 18, 2024
f791167
optional transform
nicklan Dec 18, 2024
b7268e5
add initial tests
nicklan Dec 18, 2024
da5a9e8
adjust comments
nicklan Dec 18, 2024
e3fdfaa
fix comment
nicklan Dec 18, 2024
e9a8d1c
oops, fix ffi
nicklan Dec 19, 2024
b773614
cleanup examples
nicklan Dec 19, 2024
ebcb42d
Actually use ExpressionRef
nicklan Dec 19, 2024
3a38785
Merge branch 'main' into transform-expr
nicklan Dec 19, 2024
58ad2a3
remove unused try_from
nicklan Dec 19, 2024
3d040f7
need transform if column mapping is enabled
nicklan Dec 19, 2024
999143d
move getting transform into own function
nicklan Jan 7, 2025
de5c18a
rename transforms
nicklan Jan 7, 2025
d5e55a4
address comments
nicklan Jan 7, 2025
a97f605
Merge branch 'main' into transform-expr
nicklan Jan 7, 2025
1ea159c
flatten imports
nicklan Jan 7, 2025
f6b81ac
switch to a Vec for transforms
nicklan Jan 9, 2025
7254704
Merge branch 'main' into transform-expr
nicklan Jan 10, 2025
fe68f0a
Merge branch 'main' into transform-expr
nicklan Jan 14, 2025
0ea983d
address comments
nicklan Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ fn kernel_scan_data_next_impl(
.data
.lock()
.map_err(|_| Error::generic("poisoned mutex"))?;
if let Some((data, sel_vec)) = data.next().transpose()? {
if let Some((data, sel_vec, _transforms)) = data.next().transpose()? {
let bool_slice = KernelBoolSlice::from(sel_vec);
(engine_visitor)(engine_context, data.into(), bool_slice);
Ok(true)
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ fn try_main() -> DeltaResult<()> {
let scan = ScanBuilder::new(snapshot).build()?;
let scan_data = scan.scan_data(&engine)?;
for res in scan_data {
let (data, vector) = res?;
let (data, vector, _transforms) = res?;
delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
&vector,
Expand Down
2 changes: 1 addition & 1 deletion kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ fn try_main() -> DeltaResult<()> {
drop(record_batch_tx);

for res in scan_data {
let (data, vector) = res?;
let (data, vector, _transforms) = res?;
scan_file_tx = delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
&vector,
Expand Down
4 changes: 3 additions & 1 deletion kernel/src/engine_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ pub trait TypedGetData<'a, T> {
fn get_opt(&'a self, row_index: usize, field_name: &str) -> DeltaResult<Option<T>>;
fn get(&'a self, row_index: usize, field_name: &str) -> DeltaResult<T> {
let val = self.get_opt(row_index, field_name)?;
val.ok_or_else(|| Error::MissingData(format!("Data missing for field {field_name}")))
val.ok_or_else(|| {
Error::MissingData(format!("Data missing for field {field_name}")).with_backtrace()
})
Comment on lines +132 to +134
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intentional/permanent change? Or just for debugging?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intentional, since this error occurs in more than one place

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: I wonder if we should start adding some kind of "location code" as a (much) cheaper alternative to backtraces, that also stays stable as the code base evolves around it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that could work. I'm not too worried about perf for backtraces as they should only appear in error cases though

}
}

Expand Down
159 changes: 144 additions & 15 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::clone::Clone;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, LazyLock};

use itertools::Itertools;
use tracing::debug;

use super::data_skipping::DataSkippingFilter;
use super::ScanData;
use super::{ScanData, Transform};
use crate::actions::get_log_add_schema;
use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
use crate::expressions::{column_expr, column_name, ColumnName, Expression, ExpressionRef};
use crate::scan::DeletionVectorDescriptor;
use crate::scan::{DeletionVectorDescriptor, TransformExpr};
use crate::schema::{ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType};
use crate::utils::require;
use crate::{DeltaResult, Engine, EngineData, Error, ExpressionEvaluator};
Expand Down Expand Up @@ -44,12 +45,17 @@ struct LogReplayScanner {
struct AddRemoveDedupVisitor<'seen> {
seen: &'seen mut HashSet<FileActionKey>,
selection_vector: Vec<bool>,
logical_schema: SchemaRef,
transform: Option<Arc<Transform>>,
OussamaSaoudi-db marked this conversation as resolved.
Show resolved Hide resolved
transforms: HashMap<usize, Expression>,
is_log_batch: bool,
}

impl AddRemoveDedupVisitor<'_> {
/// Checks if log replay already processed this logical file (in which case the current action
/// should be ignored). If not already seen, register it so we can recognize future duplicates.
/// Returns `true` if we have seen the file and should ignore it, `false` if we have not seen it
/// and should process it.
fn check_and_record_seen(&mut self, key: FileActionKey) -> bool {
// Note: each (add.path + add.dv_unique_id()) pair has a
// unique Add + Remove pair in the log. For example:
Expand Down Expand Up @@ -83,11 +89,11 @@ impl AddRemoveDedupVisitor<'_> {
// have a remove with a path at index 4. In either case, extract the three dv getters at
// indexes that immediately follow a valid path index.
let (path, dv_getters, is_add) = if let Some(path) = getters[0].get_str(i, "add.path")? {
(path, &getters[1..4], true)
(path, &getters[2..5], true)
} else if !self.is_log_batch {
return Ok(false);
} else if let Some(path) = getters[4].get_opt(i, "remove.path")? {
(path, &getters[5..8], false)
} else if let Some(path) = getters[5].get_opt(i, "remove.path")? {
(path, &getters[6..9], false)
} else {
return Ok(false);
};
Expand All @@ -103,7 +109,33 @@ impl AddRemoveDedupVisitor<'_> {

// Process both adds and removes, but only return not already-seen adds
let file_key = FileActionKey::new(path, dv_unique_id);
Ok(!self.check_and_record_seen(file_key) && is_add)
let have_seen = self.check_and_record_seen(file_key);
if is_add && !have_seen {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to #615:

Data skipping runs before this visitor, which means we can't use the partition values for data skipping in its current form.

How should we proceed? Even if we run a partition value extraction visitor before data skipping, that builds a hash map of parsed partition value literals (instead of embedding them in a struct expression), we still can't use the normal data skipping expression machinery. We'd almost need the row visitor itself to apply partition pruning, using a DefaultPredicateEvaluator that sits on top of the partition values map. The (big) downside of that approach is it won't reliably handle predicates that mix references to partition columns and normal columns, e.g. the following predicate would have no data skipping at all, because both predicate evaluators would reject the OR due to a missing leg:

WHERE partition_col1 = 10 OR value_col2 = 20

It would at least handle top-level AND gracefully, tho:

WHERE partition_col1 = 10 AND value_col2 = 20

(because each predicate evaluator would work with the subset of the AND it understands)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we run a partition value extraction visitor before data skipping, that builds a hash map of parsed partition value literals (instead of embedding them in a struct expression), we still can't use the normal data skipping expression machinery.

Could you explain why we can't use the normal data skipping expression machinery? Current data skipping reads the stats field of add actions. I imagine we could use a visitor to extract the partition values along with the stats, then write back the stats field with updated values. Then data skipping proceeds as normal. idk if this is perhaps expensive, but I think it'll be important to be able to do data skipping on predicates with mixed references.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely want the effect of data skipping, one way or another. I just meant that today's data skipping flow happens before the row visitor that could extract and parse partition values.

Either we need to add a second visitor that runs first and updates the stats column, or we apply partition skipping as a completely separate step (that could run before or after normal data skipping). Updating the stats column has several disadvantages:

  1. Needs a separate visitor pass (runtime cost)
  2. We don't currently have any API for updating an EngineData (we only have expression eval). We know we need to eventually add such capability, but we don't have it yet.
  3. Stats-based pruning code isn't a great fit for partition values, because it wouldn't support nullcount based pruning and min/max based pruning is needlessly complex when always min=max for partition values.

That makes me wonder if we should apply partition pruning after stats-based pruning, as part of the existing row visitor that already filters out previously seen files:

  • Parse partition values into a HashMap<ColumnName, Scalar>, which already has #[cfg(test)] impl ResolveColumnAsScalar in predicates/mod.rs (just need to remove the feature flag from it).
  • Wrap a DefaultPredicateEvaluator around the partition values hashmap, and evaluate it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha that makes sense. So move it till later to avoid complicating the existing data skipping and avoiding the runtime cost.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for mixed references -- it will work for a majority of cases, because most partition predicates are simple top-level conjuncts, like this:

WHERE partition_col1 = 10 AND value_col2 = 20

The partition pruning code would handle the first conjunct (ignoring the second), and stats pruning code would handle the second conjunct (ignoring the first). This is actually how Delta-spark does it today.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like being out was a great way for me to get this resolved :)

In seriousness though, that suggestion makes sense. We can let the existing flow prune via stats, and then just run the predicate evaluator over the extracted hashmap in the visitor, which can modify its already existing selection vector to prune files where the partition doesn't match.

wrt. to this PR, I think the code flow then still makes sense, and we can take partition pruning as a follow-up?

// compute transform here
if let Some(ref transform) = self.transform {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I like to avoid nesting where possible. I wonder if we can do early returns or factor this out into a resolve_transform_expr function.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree on both nesting and rule of 30 here.

Also, this code is redundant:

let have_seen = self.check_and_record_seen(file_key);
if is_add && !have_seen {
    ... do stuff ...
}
Ok(is_add && !have_seen)

The early return would make very clear what's going on:

if !is_add || have_seen {
    return Ok(false);
}
... do stuff ...
Ok(true)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep thanks, moved into its own function

let partition_values: HashMap<_, _> = getters[1].get(i, "add.partitionValues")?;
let transforms = transform
.iter()
.map(|transform_expr| match transform_expr {
TransformExpr::Partition(field_idx) => {
let field = self.logical_schema.fields.get_index(*field_idx);
let Some((_, field)) = field else {
return Err(Error::generic(
"logical schema did not contain expected field, can't transform data",
));
};
let name = field.physical_name();
let value_expression =
super::parse_partition_value(partition_values.get(name), field.data_type())?;
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
Ok(value_expression.into())
}
TransformExpr::Static(field_expr) => Ok(field_expr.clone()),
})
.try_collect()?;
self.transforms.insert(i, Expression::Struct(transforms));
}
}
Ok(!have_seen && is_add)
}
}

Expand All @@ -113,8 +145,10 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| {
const STRING: DataType = DataType::STRING;
const INTEGER: DataType = DataType::INTEGER;
let ss_map: DataType = MapType::new(STRING, STRING, true).into();
let types_and_names = vec![
(STRING, column_name!("add.path")),
(ss_map, column_name!("add.partitionValues")),
(STRING, column_name!("add.deletionVector.storageType")),
(STRING, column_name!("add.deletionVector.pathOrInlineDv")),
(INTEGER, column_name!("add.deletionVector.offset")),
Expand All @@ -132,12 +166,12 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> {
} else {
// All checkpoint actions are already reconciled and Remove actions in checkpoint files
// only serve as tombstones for vacuum jobs. So we only need to examine the adds here.
(&names[..4], &types[..4])
(&names[..5], &types[..5])
}
}

fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
let expected_getters = if self.is_log_batch { 8 } else { 4 };
let expected_getters = if self.is_log_batch { 9 } else { 5 };
require!(
getters.len() == expected_getters,
Error::InternalError(format!(
Expand Down Expand Up @@ -207,6 +241,8 @@ impl LogReplayScanner {
&mut self,
add_transform: &dyn ExpressionEvaluator,
actions: &dyn EngineData,
logical_schema: SchemaRef,
transform: Option<Arc<Transform>>,
is_log_batch: bool,
) -> DeltaResult<ScanData> {
// Apply data skipping to get back a selection vector for actions that passed skipping. We
Expand All @@ -220,24 +256,29 @@ impl LogReplayScanner {
let mut visitor = AddRemoveDedupVisitor {
seen: &mut self.seen,
selection_vector,
logical_schema,
transform,
transforms: HashMap::new(),
is_log_batch,
};
visitor.visit_rows_of(actions)?;

// TODO: Teach expression eval to respect the selection vector we just computed so carefully!
let selection_vector = visitor.selection_vector;
let result = add_transform.evaluate(actions)?;
Ok((result, selection_vector))
Ok((result, selection_vector, visitor.transforms))
}
}

/// Given an iterator of (engine_data, bool) tuples and a predicate, returns an iterator of
/// `(engine_data, selection_vec)`. Each row that is selected in the returned `engine_data` _must_
/// be processed to complete the scan. Non-selected rows _must_ be ignored. The boolean flag
/// indicates whether the record batch is a log or checkpoint batch.
pub fn scan_action_iter(
pub(crate) fn scan_action_iter(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this is a significant change as we not longer expose this function. In discussion so far we've agreed that it basically should never have been pub, and I just made a mistake when doing so. An engine should call scan_data which mostly just proxies to this, but doesn't expose internal details to the engine.

Open to discussion though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pub(crate) SGTM!

engine: &dyn Engine,
action_iter: impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>>,
logical_schema: SchemaRef,
transform: Option<Arc<Transform>>,
physical_predicate: Option<(ExpressionRef, SchemaRef)>,
) -> impl Iterator<Item = DeltaResult<ScanData>> {
let mut log_scanner = LogReplayScanner::new(engine, physical_predicate);
Expand All @@ -249,20 +290,39 @@ pub fn scan_action_iter(
action_iter
.map(move |action_res| {
let (batch, is_log_batch) = action_res?;
log_scanner.process_scan_batch(add_transform.as_ref(), batch.as_ref(), is_log_batch)
log_scanner.process_scan_batch(
add_transform.as_ref(),
batch.as_ref(),
logical_schema.clone(),
transform.clone(),
is_log_batch,
)
})
.filter(|res| res.as_ref().map_or(true, |(_, sv)| sv.contains(&true)))
.filter(|res| res.as_ref().map_or(true, |(_, sv, _)| sv.contains(&true)))
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

use crate::expressions::{column_name, Scalar};
use crate::scan::{
get_state_info,
state::{DvInfo, Stats},
test_utils::{add_batch_simple, add_batch_with_remove, run_with_validate_callback},
test_utils::{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can perhaps flatten these imports.

add_batch_simple, add_batch_with_partition_col, add_batch_with_remove,
run_with_validate_callback,
},
Scan,
};
use crate::{
engine::sync::SyncEngine,
schema::{DataType, SchemaRef, StructField, StructType},
Expression,
};

use super::scan_action_iter;

// dv-info is more complex to validate, we validate that works in the test for visit_scan_files
// in state.rs
fn validate_simple(
Expand All @@ -288,6 +348,8 @@ mod tests {
fn test_scan_action_iter() {
run_with_validate_callback(
vec![add_batch_simple()],
None, // not testing schema
None, // not testing transform
&[true, false],
(),
validate_simple,
Expand All @@ -298,9 +360,76 @@ mod tests {
fn test_scan_action_iter_with_remove() {
run_with_validate_callback(
vec![add_batch_with_remove()],
None, // not testing schema
None, // not testing transform
&[false, false, true, false],
(),
validate_simple,
);
}

#[test]
fn test_no_transforms() {
let batch = vec![add_batch_simple()];
let logical_schema = Arc::new(crate::schema::StructType::new(vec![]));
let iter = scan_action_iter(
&SyncEngine::new(),
batch.into_iter().map(|batch| Ok((batch as _, true))),
logical_schema,
None,
None,
);
for res in iter {
let (_batch, _sel, transforms) = res.unwrap();
assert!(transforms.is_empty(), "Should have no transforms");
}
}

#[test]
fn test_simple_transform() {
let schema: SchemaRef = Arc::new(StructType::new([
StructField::new("value", DataType::INTEGER, true),
StructField::new("date", DataType::DATE, true),
]));
let partition_cols = ["date".to_string()];
let state_info = get_state_info(schema.as_ref(), &partition_cols).unwrap();
let static_transform = Some(Arc::new(Scan::get_static_transform(&state_info.all_fields)));
let batch = vec![add_batch_with_partition_col()];
let iter = scan_action_iter(
&SyncEngine::new(),
batch.into_iter().map(|batch| Ok((batch as _, true))),
schema,
static_transform,
None,
);

fn validate_transform(transform: Option<&Expression>, expected_date_offset: i32) {
assert!(transform.is_some());
if let Expression::Struct(inner) = transform.unwrap() {
if let Expression::Column(ref name) = inner[0] {
assert_eq!(name, &column_name!("value"), "First col should be 'value'");
} else {
panic!("Expected first expression to be a column");
}
if let Expression::Literal(ref scalar) = inner[1] {
assert_eq!(
scalar,
&Scalar::Date(expected_date_offset),
"Didn't get expected date offset"
);
} else {
panic!("Expected second expression to be a literal");
}
} else {
panic!("Transform should always be a struct expr");
}
}

for res in iter {
let (_batch, _sel, transforms) = res.unwrap();
assert_eq!(transforms.len(), 2, "Should have two transforms");
validate_transform(transforms.get(&0), 17511);
validate_transform(transforms.get(&1), 17510);
}
}
}
Loading
Loading