Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Part 1, Read transforms via expressions: Just compute the expression and return it. #607

Open
wants to merge 28 commits into
base: main
Choose a base branch
from

Conversation

nicklan
Copy link
Collaborator

@nicklan nicklan commented Dec 18, 2024

What changes are proposed in this pull request?

This is the initial part of moving to using expressions to express transformations when reading data. What this PR does is:

  • Compute a "static" transform, which is just a set of column expressions that need to be passed directly through without change, or enough metadata for lower levels to fill in a "fixup" expression
  • The static transform is passed into the iterator that parses each Add file
  • When parsing the Add file, if there are needed fix-ups (just partition columns today), the correct expression is created, and inserted into a row indexed map
  • This map is returned so the caller can find out for a given row what, if any, expression needs to be applied when reading the specified row

Follow-up PRs:

Each of those are more invasive and end up touching significant code, so I'm staging this as much as possible to make reviews easier.

How was this change tested?

Unit tests, and inspection of resultant expressions when run on tables

@nicklan nicklan requested review from zachschuermann, scovich and OussamaSaoudi-db and removed request for zachschuermann and scovich December 18, 2024 20:54
@github-actions github-actions bot added the breaking-change Change that will require a version bump label Dec 18, 2024
}
}

/// Given an iterator of (engine_data, bool) tuples and a predicate, returns an iterator of
/// `(engine_data, selection_vec)`. Each row that is selected in the returned `engine_data` _must_
/// be processed to complete the scan. Non-selected rows _must_ be ignored. The boolean flag
/// indicates whether the record batch is a log or checkpoint batch.
pub fn scan_action_iter(
pub(crate) fn scan_action_iter(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this is a significant change as we not longer expose this function. In discussion so far we've agreed that it basically should never have been pub, and I just made a mistake when doing so. An engine should call scan_data which mostly just proxies to this, but doesn't expose internal details to the engine.

Open to discussion though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pub(crate) SGTM!

Copy link

codecov bot commented Dec 19, 2024

Codecov Report

Attention: Patch coverage is 87.57396% with 21 lines in your changes missing coverage. Please review.

Project coverage is 83.50%. Comparing base (b3546f0) to head (0ea983d).

Files with missing lines Patch % Lines
kernel/src/scan/log_replay.rs 85.48% 7 Missing and 11 partials ⚠️
kernel/src/scan/mod.rs 94.87% 0 Missing and 2 partials ⚠️
ffi/src/scan.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #607      +/-   ##
==========================================
+ Coverage   83.45%   83.50%   +0.04%     
==========================================
  Files          75       75              
  Lines       16918    17072     +154     
  Branches    16918    17072     +154     
==========================================
+ Hits        14119    14256     +137     
- Misses       2145     2152       +7     
- Partials      654      664      +10     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

kernel/src/scan/log_replay.rs Outdated Show resolved Hide resolved
kernel/src/scan/log_replay.rs Outdated Show resolved Hide resolved
pub(crate) fn add_batch_with_partition_col() -> Box<ArrowEngineData> {
let handler = SyncJsonHandler {};
let json_strings: StringArray = vec![
r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c001.snappy.parquet","partitionValues": {"date": "2017-12-11"},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we improve the MockTable to write tests like these? One nice part of the test you're writing is that they don't have to create temp directories or perform any io.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I think we should just have a nice way to declare the properties we want of the table in code. Not something for this PR but on the todo list for sure!

Comment on lines 419 to 425
let static_transform = if self.have_partition_cols
|| self.snapshot.column_mapping_mode != ColumnMappingMode::None
{
Some(Arc::new(Scan::get_static_transform(&self.all_fields)))
} else {
None
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is better, but you could do something like this:

Suggested change
let static_transform = if self.have_partition_cols
|| self.snapshot.column_mapping_mode != ColumnMappingMode::None
{
Some(Arc::new(Scan::get_static_transform(&self.all_fields)))
} else {
None
};
let static_transform = (self.have_partition_cols
|| self.snapshot.column_mapping_mode != ColumnMappingMode::None)
.then_some(Arc::new(Scan::get_static_transform(&self.all_fields)));

let have_seen = self.check_and_record_seen(file_key);
if is_add && !have_seen {
// compute transform here
if let Some(ref transform) = self.transform {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I like to avoid nesting where possible. I wonder if we can do early returns or factor this out into a resolve_transform_expr function.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree on both nesting and rule of 30 here.

Also, this code is redundant:

let have_seen = self.check_and_record_seen(file_key);
if is_add && !have_seen {
    ... do stuff ...
}
Ok(is_add && !have_seen)

The early return would make very clear what's going on:

if !is_add || have_seen {
    return Ok(false);
}
... do stuff ...
Ok(true)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep thanks, moved into its own function

pub type ScanData = (
Box<dyn EngineData>,
Vec<bool>,
HashMap<usize, ExpressionRef>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just like CDF, another case where we want to return maps to the engine. Would be curious to hear what the plan for engine integration/FFI is.

@nicklan nicklan changed the title Read transforms via expressions. Part 1: Just compute the expression and return it. Part 1, Read transforms via expressions: Just compute the expression and return it. Dec 20, 2024
Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When parsing the Add file, if there are needed fix-ups (just partition columns today), the correct expression is created, and inserted into a row indexed map

Why do we need a map here? It seems like we either have a fixup for every row, or for no rows? Just apply the fixup conditionally if we see a non-empty vec of fixups?

Comment on lines +132 to +134
val.ok_or_else(|| {
Error::MissingData(format!("Data missing for field {field_name}")).with_backtrace()
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intentional/permanent change? Or just for debugging?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intentional, since this error occurs in more than one place

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: I wonder if we should start adding some kind of "location code" as a (much) cheaper alternative to backtraces, that also stays stable as the code base evolves around it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that could work. I'm not too worried about perf for backtraces as they should only appear in error cases though

let have_seen = self.check_and_record_seen(file_key);
if is_add && !have_seen {
// compute transform here
if let Some(ref transform) = self.transform {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree on both nesting and rule of 30 here.

Also, this code is redundant:

let have_seen = self.check_and_record_seen(file_key);
if is_add && !have_seen {
    ... do stuff ...
}
Ok(is_add && !have_seen)

The early return would make very clear what's going on:

if !is_add || have_seen {
    return Ok(false);
}
... do stuff ...
Ok(true)

Comment on lines 112 to 113
let have_seen = self.check_and_record_seen(file_key);
if is_add && !have_seen {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to #615:

Data skipping runs before this visitor, which means we can't use the partition values for data skipping in its current form.

How should we proceed? Even if we run a partition value extraction visitor before data skipping, that builds a hash map of parsed partition value literals (instead of embedding them in a struct expression), we still can't use the normal data skipping expression machinery. We'd almost need the row visitor itself to apply partition pruning, using a DefaultPredicateEvaluator that sits on top of the partition values map. The (big) downside of that approach is it won't reliably handle predicates that mix references to partition columns and normal columns, e.g. the following predicate would have no data skipping at all, because both predicate evaluators would reject the OR due to a missing leg:

WHERE partition_col1 = 10 OR value_col2 = 20

It would at least handle top-level AND gracefully, tho:

WHERE partition_col1 = 10 AND value_col2 = 20

(because each predicate evaluator would work with the subset of the AND it understands)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we run a partition value extraction visitor before data skipping, that builds a hash map of parsed partition value literals (instead of embedding them in a struct expression), we still can't use the normal data skipping expression machinery.

Could you explain why we can't use the normal data skipping expression machinery? Current data skipping reads the stats field of add actions. I imagine we could use a visitor to extract the partition values along with the stats, then write back the stats field with updated values. Then data skipping proceeds as normal. idk if this is perhaps expensive, but I think it'll be important to be able to do data skipping on predicates with mixed references.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely want the effect of data skipping, one way or another. I just meant that today's data skipping flow happens before the row visitor that could extract and parse partition values.

Either we need to add a second visitor that runs first and updates the stats column, or we apply partition skipping as a completely separate step (that could run before or after normal data skipping). Updating the stats column has several disadvantages:

  1. Needs a separate visitor pass (runtime cost)
  2. We don't currently have any API for updating an EngineData (we only have expression eval). We know we need to eventually add such capability, but we don't have it yet.
  3. Stats-based pruning code isn't a great fit for partition values, because it wouldn't support nullcount based pruning and min/max based pruning is needlessly complex when always min=max for partition values.

That makes me wonder if we should apply partition pruning after stats-based pruning, as part of the existing row visitor that already filters out previously seen files:

  • Parse partition values into a HashMap<ColumnName, Scalar>, which already has #[cfg(test)] impl ResolveColumnAsScalar in predicates/mod.rs (just need to remove the feature flag from it).
  • Wrap a DefaultPredicateEvaluator around the partition values hashmap, and evaluate it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha that makes sense. So move it till later to avoid complicating the existing data skipping and avoiding the runtime cost.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for mixed references -- it will work for a majority of cases, because most partition predicates are simple top-level conjuncts, like this:

WHERE partition_col1 = 10 AND value_col2 = 20

The partition pruning code would handle the first conjunct (ignoring the second), and stats pruning code would handle the second conjunct (ignoring the first). This is actually how Delta-spark does it today.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like being out was a great way for me to get this resolved :)

In seriousness though, that suggestion makes sense. We can let the existing flow prune via stats, and then just run the predicate evaluator over the extracted hashmap in the visitor, which can modify its already existing selection vector to prune files where the partition doesn't match.

wrt. to this PR, I think the code flow then still makes sense, and we can take partition pruning as a follow-up?

@timsaucer timsaucer mentioned this pull request Jan 7, 2025
2 tasks
Copy link
Collaborator

@OussamaSaoudi-db OussamaSaoudi-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good to me 👍

state::{DvInfo, Stats},
test_utils::{add_batch_simple, add_batch_with_remove, run_with_validate_callback},
test_utils::{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can perhaps flatten these imports.

@nicklan
Copy link
Collaborator Author

nicklan commented Jan 8, 2025

Why do we need a map here? It seems like we either have a fixup for every row, or for no rows? Just apply the fixup conditionally if we see a non-empty vec of fixups?

It's not the same for every row. The file could have a different value for the partition column for instance, or in the future each file could need the variant cols fixed up differently.

Perhaps you meant we could just have a Vec<Option<Transform>> where the index maps to the row? I considered this as well and decided that we'd potentially be creating large vecs of None in the cases that the majority of the batch was not Add actions, and so this was more compact and efficient. Happy to revisit that choice if you think otherwise though.

@scovich
Copy link
Collaborator

scovich commented Jan 8, 2025

Why do we need a map here? It seems like we either have a fixup for every row, or for no rows? Just apply the fixup conditionally if we see a non-empty vec of fixups?

It's not the same for every row. The file could have a different value for the partition column for instance, or in the future each file could need the variant cols fixed up differently.

Perhaps you meant we could just have a Vec<Option<Transform>> where the index maps to the row? I considered this as well and decided that we'd potentially be creating large vecs of None in the cases that the majority of the batch was not Add actions, and so this was more compact and efficient. Happy to revisit that choice if you think otherwise though.

I think we have three potential cases:

  1. No fixups are needed (e.g. not partitioned, no variant shredding, etc). The container should be empty or even missing (don't bloat it with a bunch of None). Doesn't matter which container type we use.
  2. ~Most rows need a fixup. Vec would be preferable to HashMap in that case (zipper join instead of a hash join)
  3. ~Few rows need a fixup (e.g. because many add actions were filtered out by data skipping). Open question whether Vec<Option> or HashMap is better.

Vec is definitely good for 1/ and 2/, and I estimate that Vec will also work Just Fine for 3/, given the batch sizes we expect to encounter in practice.

Rationale:

  • Each fixup will be fairly large, perhaps O(100 bytes)
  • To hit a space problem we would need very good data skipping. For example, if we assume 1 fixup is equivalent to 25 None, we would need 96% skipping or better to get even 2x bloat.
  • Further, we would need a large batch size (128k rows or more) for all those None to occupy more than a couple MB of actual memory. Checkpoint parquet files only hold 30k rows each, so that leaves only json commits which are usually not very large.
  • Even if we did hit both conditions above, data skipping is very query-dependent and so we would have a 25x bigger space problem in case we ever got a query with poor data skipping.

@nicklan
Copy link
Collaborator Author

nicklan commented Jan 9, 2025

Vec is definitely good for 1/ and 2/, and I estimate that Vec will also work Just Fine for 3/, given the batch sizes we expect to encounter in practice.

Following discussion, agree a Vec makes more sense, and have changed it to that

Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple nits to fix before merge, but otherwise LGTM

Comment on lines +132 to +134
val.ok_or_else(|| {
Error::MissingData(format!("Data missing for field {field_name}")).with_backtrace()
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: I wonder if we should start adding some kind of "location code" as a (much) cheaper alternative to backtraces, that also stays stable as the code base evolves around it?

));
};
let name = field.physical_name();
let value_expression = super::parse_partition_value(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: Looking at #624, I wonder if there's a (worthwhile) way to parse partition values only once per file action? But partition pruning and data fixup happen so far apart that I suspect it would be simpler (and maybe even cheaper) to parse a second time rather than try to build up and track a big side collection of parsed partition values.

It would perhaps be a different story if we had a clean way to convert partition values from string-string map to parsed struct using expressions, because then the partition values would be conveniently embedded in the log replay engine data. But I don't see that happening any time soon, given how much effort it would take to add map and string parsing expression support.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a good point. Depending on how we merge things, we should consider looking at it when the second of this or #624 go in

partition_values.get(name),
field.data_type(),
)?;
Ok(value_expression.into())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically this isn't an expression (yet). Maybe better to call it partition_value (scalar), which then gets converted into a (literal) expression?

Suggested change
Ok(value_expression.into())
Ok(partition_value.into())

Comment on lines 143 to 146
let have_seen = self.check_and_record_seen(file_key);
if !is_add || have_seen {
return Ok(false);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a dangerous change (because somebody trying to "optimize" the code might produce control flow that skips non-adds without checking them first). Now that you no longer need the have_seen multiple times, can we partially revert so it matches the code comment at L142 again?

Suggested change
let have_seen = self.check_and_record_seen(file_key);
if !is_add || have_seen {
return Ok(false);
}
if self.check_and_record_seen(file_key) || !is_add {
return Ok(false);
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we should probably update the comment to match the new code:

// Check both adds and removes (skipping already-seen), but only transform and return adds


fn validate_transform(transform: Option<&ExpressionRef>, expected_date_offset: i32) {
assert!(transform.is_some());
if let Expression::Struct(inner) = transform.unwrap().as_ref() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a good place for let-else matching?

            let Expression::Struct(inner) = transform.unwrap().as_ref() else {
                panic!("Transform should always be a struct expr");
            };
            assert_eq!(...);
            
            let Expression::Column(ref name) = inner[0] else {
                panic!("Expected first expression to be a column");
            };
            assert_eq!(...);
            
            let Expression::Literal(ref scalar) = inner[1] else {
                panic!("Expected second expression to be a literal");
            };
            assert_eq!(...);

(less indentation => more readable)

@@ -371,11 +399,22 @@ impl Scan {
/// the query. NB: If you are using the default engine and plan to call arrow's
/// `filter_record_batch`, you _need_ to extend this vector to the full length of the batch or
/// arrow will drop the extra rows.
/// - `HashMap<usize, Expression>`: Transformation expressions that need to be applied. For each
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's using a Vec now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch. updated and updated the description

Comment on lines 412 to 414
// Compute the static part of the transformation. This is `None` if no transformation is
// needed (currently just means no partition cols, but will be extended for other transforms
// as we support them)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment doesn't reference column mapping? Should it?

Also, what other kind of transform might there be, besides "static" referenced here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Other transforms" means future things we may need to apply transforms for. So, variant decoding for example. If something needed variant decoding then the static_transform would not be None.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking-change Change that will require a version bump
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants