Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

partition skipping filter #624

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

timsaucer
Copy link

@timsaucer timsaucer commented Jan 7, 2025

What changes are proposed in this pull request?

This supersedes #615

This PR adds in a partition filter step that is similar to the data skipping. It adds in a row visitor that checks to see if any filters should be applied at a file level based on partition values. The approach here is based on the discussion in #607.

How was this change tested?

Tested in datafusion against an existing partitioned dataset and also against the integration test datasets for both multi-partition and single partition. I have also tested that combinations of data skipping and partition skipping are returning the correct number of files in the scan.

Remaining TODO items before this is ready for review/merge:

  • Add in unit tests
  • Ensure data types are cast based on schema (currently just evaluating the string values)

@timsaucer
Copy link
Author

timsaucer commented Jan 7, 2025

@scovich I still have some more testing to add in unit tests before this is ready for review, but would you mind taking a quick look to see if it matches the design you were thinking?

kernel/src/scan/partition_skipping.rs Outdated Show resolved Hide resolved
kernel/src/scan/partition_skipping.rs Outdated Show resolved Hide resolved
kernel/src/scan/partition_skipping.rs Show resolved Hide resolved
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
let getter = getters[0];
for i in 0..row_count {
let val = getter.get_map(i, "output")?.and_then(|m| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we maintain the invariant that Some(PartitionSkippingFilter) implies a partitioned table (see other comment above), then we can treat the partition values column as non-nullable and avoid a bunch of complexity.

Also, it should always be a query error if a partition value fails to parse, because that is table data we failed to access.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: The partition values column is anyway non-nullable, according to the Delta spec:

The schema of the add action is as follows:

Field Name Data Type Description optional/required
... ... ... ...
partitionValues Map[String, String] A map from partition column to value for this logical file. See also Partition Value Serialization required

(I think we decided in past discussions that a nullable partition value may have no corresponding map entry -- but a strict reading of the Delta spec suggests that the entry should still be there and just be an empty string)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: The default predicate evaluator's is_null method will treat None differently from Scalar::Null, so we should probably do the work to ensure that missing partition value entries (if tolerated) are correctly translated to null scalars. We anyway need to validate the partition schema, so we can probably pre-populate the partition value map with nulls at the same time and let the visitor overwrite the entries as needed?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, that suggests something like the following:

let partition_values = self.null_partition_values.clone();
let raw_partition_values: MapItem<'_> = getter.get(i, "output")?;
for (k, v) in raw_partition_values.materialize() {
      ...
    let scalar = primitive_type.parse__scalar(v)?;
    partition_values.insert(ColumnName::new([k]), scalar);
}
let filter = DefaultPredicateEvaluator::from(partition_values);

The above assumes that the visitor has a new field, null_partition_values: &'map HashMap<ColumnName, Scalar>, which the caller already prepopulated with null scalars by looking at the table's schema and partition column names (preferably once, when building the PartitionSkippingFilter itself).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the most recent push I'm building the resolver by going through each of the partition columns and extracting the value from the materialized getter. When they're missing, I'm setting it to Scalar::Null. I think this has the same effect as what you're suggesting.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the most recent push I'm building the resolver by going through each of the partition columns and extracting the value from the materialized getter. When they're missing, I'm setting it to Scalar::Null. I think this has the same effect as what you're suggesting.

Yes, that should work.

kernel/src/scan/partition_skipping.rs Outdated Show resolved Hide resolved

fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
let getter = getters[0];
for i in 0..row_count {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider passing (and updating) the existing selection vector instead of creating a new one, because not all rows of the data we visit are even valid add actions. That way, we only try to extract partition values for rows that have survived this far, and we clear the selection bit if the filter prunes out the file.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think we should have an additive vector to minimize what work we need to do, and to keep things simple.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: It's not just about minimizing work. Right now, we're attempting to apply partition filters to every action, even non-file actions. Which forces us to treat the partition values column as nullable even tho the Delta spec says it's required. That means extra complexity to compensate for possible nullness, and risk of missing a corrupt table whose add actions illegally lack the add.partitionValues column.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it would be simpler to inject partition pruning into the existing AddRemoveDedupVisitor::is_valid_add method, instead of defining a whole new visitor? It already accepts a selection vector, and it already skips non-file actions. We would just have to add the partition values to the schema it works with. Because add and remove actions both always provide partition columns, we could partition prune both before calling AddRemoveDedupVisitor::self.check_and_record_seen, to avoid bloating the hash table with pruned entries.

The one complication would be how to handle non-partition tables cleanly, if we try to avoid fetching the partition values columns for non-partitioned tables. My guess is, that's over-optimizing. We should go ahead and unconditionally fetch the empty column, and just let the visitor conditionally ignore it. Something like this:

    fn is_valid_add<'a>(&mut self, i: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<bool> {
        // Add will have a path at index 0 if it is valid; otherwise, if it is a log batch, we may
        // have a remove with a path at index 5. In either case, extract the getters for partition 
        // values and dv columns at indexes that immediately follow a valid path index.
        let (path, pv_getter, dv_getters, is_add) = 
            if let Some(path) = getters[0].get_str(i, "add.path")? {
                (path, &getters[1], &getters[2..5], true)
            } else if !self.is_log_batch {
                return Ok(false);
            } else if let Some(path) = getters[5].get_opt(i, "remove.path")? {
                (path, &getters[6], &getters[7..10], false)
            } else {
                return Ok(false);
            };

        // Only consider adds and removes that survive partition pruning
        if !self.apply_partition_filters(pv_getter)? {
          return Ok(false)
        }

        let dv_unique_id = match dv_getters[0].get_opt(i, "deletionVector.storageType")? {
            Some(storage_type) => Some(DeletionVectorDescriptor::unique_id_from_parts(
                storage_type,
                dv_getters[1].get(i, "deletionVector.pathOrInlineDv")?,
                dv_getters[2].get_opt(i, "deletionVector.offset")?,
            )),
            None => None,
        };

        // Process both adds and removes, but only return not already-seen adds
        let file_key = FileActionKey::new(path, dv_unique_id);
        Ok(!self.check_and_record_seen(file_key) && is_add)
    }

    fn apply_partition_filters<'a>(&mut self, i: usize, pv_getter: &'a dyn GetData<'a>) -> DeltaResult<bool> {
        let Some(partition_filter) = self.partition_filter else {
            return Ok(true); // no filter => keep it
        }
        // extract and parse partition values, and apply the filter to them
        let partition_values: HashMap<String, String> = pv_getter.get(i, "partitionValues")?;
        todo!()
    }

Also: I think today's approach only looks at adds, and removes would still bloat up the hash table. If so, we should fix that regardless of where the partition pruning code lives.

Copy link

codecov bot commented Jan 7, 2025

Codecov Report

Attention: Patch coverage is 89.13043% with 10 lines in your changes missing coverage. Please review.

Project coverage is 83.56%. Comparing base (c3a868f) to head (47d704e).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
kernel/src/scan/partition_skipping.rs 88.60% 4 Missing and 5 partials ⚠️
kernel/src/scan/log_replay.rs 92.30% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #624      +/-   ##
==========================================
+ Coverage   83.45%   83.56%   +0.11%     
==========================================
  Files          74       75       +1     
  Lines       16877    17009     +132     
  Branches    16877    17009     +132     
==========================================
+ Hits        14084    14213     +129     
- Misses       2135     2139       +4     
+ Partials      658      657       -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great, thanks.

I think we need to figure out if we merge this or #607 first. I suspect the code flow will look rather different once both are in.

In particular, what do you think about AddRemoveDedupVisitor becoming the main point where we do both skipping and fixup expression calculation? (We'd probably want to rename it 😄)

The code flow would then be something like:

  • call process_scan_batch on the scanner
  • build initial selection vec based on stats (this should probably be folded into the new visitor as well btw)
  • construct new visitor
  • have new visitor visit actions

new visitor does:

  • resolving adds/removes to skip removed files
  • visiting add.partitionValues
  • Applying the predicate to the extracted values (as in this PR except update the existing selection vector embedded in the visitor)
  • Computing the predicate fixup expression if not skipped

In the end we have a single selection vector that has filtered out any Add files that should be skipped either for stats or partitions, as well as the physical->logical transforms needed, and can return that to the engine.

I'm okay with merging this first and then I can update my PR to pull this logic into the visitor, or we can merge #607 and adjust this PR.

kernel/src/scan/partition_skipping.rs Show resolved Hide resolved

fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
let getter = getters[0];
for i in 0..row_count {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think we should have an additive vector to minimize what work we need to do, and to keep things simple.

@nicklan
Copy link
Collaborator

nicklan commented Jan 8, 2025

We'll need some tests for this too, probably both unit for the skipping and then at least one in kernel/test/read.rs to ensure we can properly skip

@timsaucer
Copy link
Author

Absolutely, I wasn’t going to move it out of draft until after adding tests. I wasn’t mostly looking for an early check to make sure the approach matched the suggestion in the other PR linked above.

… is necessary to skip the other partition columns. Also, if the evaluation returns a None it means the evaluation is not possible, such as equality of a scalar value and a null. These should return false in the filter.
@github-actions github-actions bot added the breaking-change Change that will require a version bump label Jan 8, 2025
@scovich
Copy link
Collaborator

scovich commented Jan 8, 2025

We'll need some tests for this too, probably both unit for the skipping and then at least one in kernel/test/read.rs to ensure we can properly skip

Absolutely, I wasn’t going to move it out of draft until after adding tests. I wasn’t mostly looking for an early check to make sure the approach matched the suggestion in the other PR linked above.

One thing to note tho -- the DefaultPredicateEvaluator itself is already tested pretty thoroughly, so any partition pruning unit tests only need to validate that the partition values map it takes as input is correct (correctly parsed, correct names, etc). The read.rs would then be a lightweight end to end test to make sure everything is plumbed through correctly.

kernel/src/scan/log_replay.rs Outdated Show resolved Hide resolved
kernel/src/scan/partition_skipping.rs Outdated Show resolved Hide resolved
let filter = DefaultPredicateEvaluator::from(resolver);
Some(filter.eval_expr(&self.predicate, false).unwrap_or(true))
Ok(filter.eval_expr(&self.predicate, false).unwrap_or(false))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look right? If the expression evaluated to None that means we don't know whether it can be skipped and we must not skip it (see my other comment about the importance of passing Scalar::Null values for partition values that are known to be NULL, so that IS [NOT] NULL predicates work correctly).

Meanwhile, if we just need to handle the extra Option/Result nesting, we can do:

let val = getter.get_map(i, "output")?.and_then(|m| {
      ...
    Ok(filter.eval_expr(&self.predicate, false)).transpose()
});

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point now.

Something I was struggling with is I was testing with the multi partitioned acceptance tests examples, some of which do have null values for some partitions. In particular looking at acceptance/tests/dat/out/reader_tests/generated/multi_partitioned I was trying to use a predicate that column "letter" equals a literal "b". This does let the nulls through because in the equality we're using PartialCmp even for equality. I get that when doing a > or < comparison it's ill defined for null values, but I would expect that checking equality should be valid. I tested adding in a partial_eq_scalars along side partial_cmp_scalars and it resolves the problem I was having.

I might coming at the problem from a different point of view than the delta-kernel design. It the intent is for nulls to make it through this kind of predicate, then I can adjust my datafusion code to add in additional predicates of "is not null" for each of these cases.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comparisons with NULL get tricky... in SQL, any comparison against NULL produces NULL -- including e.g. NULL == NULL. There's a concept of "null-safe equal" in spark (operator <=>) which always returns either TRUE or FALSE, and there are other ways in other systems to achieve a similar result. But yes -- if you want a null-safe equal (or comparison) in a system that doesn't explicitly claim to support it, you need to turn e.g. x > 10 into x IS NOT NULL AND x > 10.

Additionally for our predicate filters, a missing value is as NULL some of the time (e.g. comparisons against it return NULL), but it's NOT the same when it comes to IS [NOT] NULL predicates. Because if a value is outright missing, we don't know whether that's because it was null, or because it was unavailable. So a missing input has to cause IS [NOT] NULL to return NULL, when it comes to data skipping. For other comparisons, the correct thing already happens naturally because the result is the same for missing vs. NULL.

To give an example, suppose the predicate was:

WHERE partition_col > 10 AND value_col = 42

Then for partition pruning, we won't have stats for value_col and we'll effectively end up with:

WHERE partition_col > 10 AND NULL

That's totally fine -- FALSE AND NULL is FALSE, so we can still skip. TRUE AND NULL is NULL, but that's fine because TRUE and NULL both force us to keep the file.

Complicated stuff! For more details, see extensive code comments including:
https://github.com/delta-io/delta-kernel-rs/blob/main/kernel/src/predicates/mod.rs#L25-L44
https://github.com/delta-io/delta-kernel-rs/blob/main/kernel/src/predicates/parquet_stats_skipping.rs#L109-L153

Copy link
Collaborator

@scovich scovich Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should add, data skipping and SQL WHERE have conflicting requirements:

  • Data skipping must keep the file unless the predicate returns an actual FALSE
  • WHERE discards the row unless the predicate returns an actual TRUE

For partition pruning, we should in theory use SQL WHERE semantics -- but that's only safe if we're certain that every column the predicate references is a partition column, which is not guaranteed. Hence my compromise suggestion, that we should ensure the partition values map always has a value (Scalar::Null by default) for every partition column.

Additionally, we will probably want to hoist the eval_sql_where function from ParquetStatsSkippingFilter (which can probably go away) up to the PredicateEvaluator trait so you can use it. That way, the required IS NOT NULL checks would be injected automatically. That would also allow to simplify the can_statically_skip_all_files function in scan/mod.rs, because it could also use a DefaultPredicateEvaluator (with an empty column resolver) instead of weirdly using parquet stats skipping logic.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for taking the time to write this all up. From reading the delta spec, specifically this line in requirements for writers :

Values for all partition columns present in the schema MUST be present for all files in the table.

This would lead me to think that the right thing to do is that if any of the partitionValues are entirely missing that we should return an error because it doesn't match the spec.

Also from my reading of the spec it looks like null values should be an empty string per partition value serialization. The acceptance tests currently have things like "partitionValues":{"letter":"a","date":null,"data":"x"} which is also how my other examples show up. But to meet the spec it looks like we need to also support the empty string case. I don't immediately see any examples in the repo that have this for a unit test, so I'll be sure to add both variants (null and "").

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hoisting eval_sql_where up so DefaultPredicateEvaluator can use it: #627

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for taking the time to write this all up. From reading the delta spec, specifically this line in requirements for writers :

Values for all partition columns present in the schema MUST be present for all files in the table.

This would lead me to think that the right thing to do is that if any of the partitionValues are entirely missing that we should return an error because it doesn't match the spec.

That sounds right to me as well. However:

  1. We may want to ask delta-rs folks if they've seen other behaviors in the wild that we need to consider tolerating.
  2. We still have to worry about the non-partition columns the predicate might mention -- they won't have entries in add.partitionValues and we want them to resolve as None (not Scalar::Null).

Maybe the simplest thing to do -- in this partition skipping code at least -- is to treat missing partition values as if they were value columns. They would simply not participate in skipping, and the query that comes later can error out as appropriate.

@timsaucer
Copy link
Author

Thank you for all the thoughtful feedback. I have to step away from this for a few days to focus on other things, but it is high priority for me because it will greatly impact many of my work flows.

@timsaucer timsaucer marked this pull request as ready for review January 11, 2025 15:30
@timsaucer
Copy link
Author

I've added a unit test. Aside from passing the selection vector around to reduce work, are there other show stoppers anyone sees remaining?

@timsaucer timsaucer changed the title [WIP] partition skipping filter partition skipping filter Jan 11, 2025
Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside from passing the selection vector around to reduce work, are there other show stoppers anyone sees remaining?

Show stoppers -- no. I added some comments on some potential ways to simplify and harden the code tho.


fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
let getter = getters[0];
for i in 0..row_count {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: It's not just about minimizing work. Right now, we're attempting to apply partition filters to every action, even non-file actions. Which forces us to treat the partition values column as nullable even tho the Delta spec says it's required. That means extra complexity to compensate for possible nullness, and risk of missing a corrupt table whose add actions illegally lack the add.partitionValues column.

));
};

let scalar = partition_values.get(field.name()).map(|v| primitive_type.parse_scalar(v)).transpose()?.unwrap_or(Scalar::Null(data_type.clone()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only correct to substitute NULL if the the partition column is nullable. Otherwise, the table (or our reading of it) is corrupt and we should error out rather than return incorrect data.

.fields()
.filter(|f| partition_columns.contains(f.name()))
.cloned();
let schema = Arc::new(StructType::new(partition_fields));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the predicate doesn't reference any partition columns, the filtered schema will be empty here and we should return None:

Suggested change
let schema = Arc::new(StructType::new(partition_fields));
if partition_fields.is_empty() {
return None
}
let schema = Arc::new(StructType::new(partition_fields));

As a bonus, that check covers the case of a non-partitioned table, which means we can remove the existing call site check for that condition in log_replay.rs (above). It also means we don't have to worry about other call sites forgetting to check. Double bonus of simpler and less error-prone code.

Comment on lines +209 to +211
let partition_predicate = physical_predicate
.as_ref()
.filter(|_| !partition_columns.is_empty());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See below, we can probably get rid of this check.


fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
let getter = getters[0];
for i in 0..row_count {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it would be simpler to inject partition pruning into the existing AddRemoveDedupVisitor::is_valid_add method, instead of defining a whole new visitor? It already accepts a selection vector, and it already skips non-file actions. We would just have to add the partition values to the schema it works with. Because add and remove actions both always provide partition columns, we could partition prune both before calling AddRemoveDedupVisitor::self.check_and_record_seen, to avoid bloating the hash table with pruned entries.

The one complication would be how to handle non-partition tables cleanly, if we try to avoid fetching the partition values columns for non-partitioned tables. My guess is, that's over-optimizing. We should go ahead and unconditionally fetch the empty column, and just let the visitor conditionally ignore it. Something like this:

    fn is_valid_add<'a>(&mut self, i: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<bool> {
        // Add will have a path at index 0 if it is valid; otherwise, if it is a log batch, we may
        // have a remove with a path at index 5. In either case, extract the getters for partition 
        // values and dv columns at indexes that immediately follow a valid path index.
        let (path, pv_getter, dv_getters, is_add) = 
            if let Some(path) = getters[0].get_str(i, "add.path")? {
                (path, &getters[1], &getters[2..5], true)
            } else if !self.is_log_batch {
                return Ok(false);
            } else if let Some(path) = getters[5].get_opt(i, "remove.path")? {
                (path, &getters[6], &getters[7..10], false)
            } else {
                return Ok(false);
            };

        // Only consider adds and removes that survive partition pruning
        if !self.apply_partition_filters(pv_getter)? {
          return Ok(false)
        }

        let dv_unique_id = match dv_getters[0].get_opt(i, "deletionVector.storageType")? {
            Some(storage_type) => Some(DeletionVectorDescriptor::unique_id_from_parts(
                storage_type,
                dv_getters[1].get(i, "deletionVector.pathOrInlineDv")?,
                dv_getters[2].get_opt(i, "deletionVector.offset")?,
            )),
            None => None,
        };

        // Process both adds and removes, but only return not already-seen adds
        let file_key = FileActionKey::new(path, dv_unique_id);
        Ok(!self.check_and_record_seen(file_key) && is_add)
    }

    fn apply_partition_filters<'a>(&mut self, i: usize, pv_getter: &'a dyn GetData<'a>) -> DeltaResult<bool> {
        let Some(partition_filter) = self.partition_filter else {
            return Ok(true); // no filter => keep it
        }
        // extract and parse partition values, and apply the filter to them
        let partition_values: HashMap<String, String> = pv_getter.get(i, "partitionValues")?;
        todo!()
    }

Also: I think today's approach only looks at adds, and removes would still bloat up the hash table. If so, we should fix that regardless of where the partition pruning code lives.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking-change Change that will require a version bump
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants