Skip to content

Add test for ordering of predicate pushdown into parquet #16169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 14 additions & 31 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use datafusion_physical_expr::conjunction;
use datafusion_physical_expr_common::physical_expr::fmt_sql;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
use datafusion_physical_plan::filter_pushdown::PredicateSupport;
use datafusion_physical_plan::filter_pushdown::PredicateSupports;
use datafusion_physical_plan::metrics::Count;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
Expand Down Expand Up @@ -613,22 +612,15 @@ impl FileSource for ParquetSource {
let pushdown_filters = table_pushdown_enabled || config_pushdown_enabled;

let mut source = self.clone();
let mut allowed_filters = vec![];
let mut remaining_filters = vec![];
for filter in &filters {
if can_expr_be_pushed_down_with_schemas(filter, &file_schema) {
// This filter can be pushed down
allowed_filters.push(Arc::clone(filter));
} else {
// This filter cannot be pushed down
remaining_filters.push(Arc::clone(filter));
}
}
if allowed_filters.is_empty() {
let filters = PredicateSupports::new_with_supported_check(filters, |filter| {
can_expr_be_pushed_down_with_schemas(filter, &file_schema)
});
if filters.is_all_unsupported() {
// No filters can be pushed down, so we can just return the remaining filters
// and avoid replacing the source in the physical plan.
return Ok(FilterPushdownPropagation::unsupported(filters));
return Ok(FilterPushdownPropagation::with_filters(filters));
}
let allowed_filters = filters.collect_supported();
let predicate = match source.predicate {
Some(predicate) => conjunction(
std::iter::once(predicate).chain(allowed_filters.iter().cloned()),
Expand All @@ -637,23 +629,14 @@ impl FileSource for ParquetSource {
};
source.predicate = Some(predicate);
let source = Arc::new(source);
let filters = PredicateSupports::new(
allowed_filters
.into_iter()
.map(|f| {
if pushdown_filters {
PredicateSupport::Supported(f)
} else {
PredicateSupport::Unsupported(f)
}
})
.chain(
remaining_filters
.into_iter()
.map(PredicateSupport::Unsupported),
)
.collect(),
);
// If pushdown_filters is false we tell our parents that they still have to handle the filters,
// even if we updated the predicate to include the filters (they will only be used for stats pruning).
if !pushdown_filters {
return Ok(FilterPushdownPropagation::with_filters(
filters.make_unsupported(),
)
.with_updated_node(source));
}
Ok(FilterPushdownPropagation::with_filters(filters).with_updated_node(source))
}
}
13 changes: 13 additions & 0 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,20 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
///
/// The default implementation is a no-op that passes the result of pushdown from the children to its parent.
///
/// When returning filters via [`FilterPushdownPropagation`] the order of the filters need not match
/// the order they were passed in via `child_pushdown_result`, but preserving the order may be beneficial
/// for debugging and reasoning about the resulting plans so it is recommended to preserve the order.
///
/// There are various helper methods to make implementing this method easier, see:
/// - [`FilterPushdownPropagation::unsupported`]: to indicate that the node does not support filter pushdown at all.
/// - [`FilterPushdownPropagation::transparent`]: to indicate that the node supports filter pushdown but does not involve itself in it,
/// instead if simply transmits the result of pushdown into its children back up to its parent.
/// - [`PredicateSupports::new_with_supported_check`]: takes a callback that returns true / false for each filter to indicate pushdown support.
/// This can be used alongside [`FilterPushdownPropagation::with_filters`] and [`FilterPushdownPropagation::with_updated_node`]
/// to dynamically build a result with a mix of supported and unsupported filters.
///
/// [`PredicateSupport::Supported`]: crate::filter_pushdown::PredicateSupport::Supported
/// [`PredicateSupports::new_with_supported_check`]: crate::filter_pushdown::PredicateSupports::new_with_supported_check
fn handle_child_pushdown_result(
&self,
child_pushdown_result: ChildPushdownResult,
Expand Down
55 changes: 55 additions & 0 deletions datafusion/physical-plan/src/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ pub enum PredicateSupport {
Unsupported(Arc<dyn PhysicalExpr>),
}

impl PredicateSupport {
pub fn into_inner(self) -> Arc<dyn PhysicalExpr> {
match self {
PredicateSupport::Supported(expr) | PredicateSupport::Unsupported(expr) => {
expr
}
}
}
}

/// A thin wrapper around [`PredicateSupport`]s that allows for easy collection of
/// supported and unsupported filters. Inner vector stores each predicate for one node.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -60,6 +70,25 @@ impl PredicateSupports {
Self::new(pushdowns)
}

/// Create a new [`PredicateSupport`] with filterrs marked as supported if
/// `f` returns true and unsupported otherwise.
pub fn new_with_supported_check(
filters: Vec<Arc<dyn PhysicalExpr>>,
check: impl Fn(&Arc<dyn PhysicalExpr>) -> bool,
) -> Self {
let pushdowns = filters
.into_iter()
.map(|f| {
if check(&f) {
PredicateSupport::Supported(f)
} else {
PredicateSupport::Unsupported(f)
}
})
.collect();
Self::new(pushdowns)
}

/// Transform all filters to supported, returning a new [`PredicateSupports`]
/// with all filters as [`PredicateSupport::Supported`].
/// This does not modify the original [`PredicateSupport`].
Expand Down Expand Up @@ -102,6 +131,18 @@ impl PredicateSupports {
.collect()
}

/// Collect supported filters into a Vec, without removing them from the original
/// [`PredicateSupport`].
pub fn collect_supported(&self) -> Vec<Arc<dyn PhysicalExpr>> {
self.0
.iter()
.filter_map(|f| match f {
PredicateSupport::Supported(expr) => Some(Arc::clone(expr)),
PredicateSupport::Unsupported(_) => None,
})
.collect()
}

/// Collect all filters into a Vec, without removing them from the original
/// FilterPushdowns.
pub fn collect_all(self) -> Vec<Arc<dyn PhysicalExpr>> {
Expand Down Expand Up @@ -132,6 +173,20 @@ impl PredicateSupports {
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}

/// Check if all filters are supported.
pub fn is_all_supported(&self) -> bool {
self.0
.iter()
.all(|f| matches!(f, PredicateSupport::Supported(_)))
}

/// Check if all filters are unsupported.
pub fn is_all_unsupported(&self) -> bool {
self.0
.iter()
.all(|f| matches!(f, PredicateSupport::Unsupported(_)))
}
}

impl IntoIterator for PredicateSupports {
Expand Down
35 changes: 35 additions & 0 deletions datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,38 @@ physical_plan
02)--FilterExec: val@0 != part@1
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != d AND val@0 != c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != d OR d != val_max@1) AND val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c, d)]

# The order of filters should not matter
query TT
EXPLAIN select val, part from t_pushdown where part = 'a' AND part = val;
----
logical_plan
01)Filter: t_pushdown.val = t_pushdown.part
02)--TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.part = Utf8("a")], partial_filters=[t_pushdown.val = t_pushdown.part]
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--FilterExec: val@0 = part@1
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet

query TT
select val, part from t_pushdown where part = 'a' AND part = val;
----
a a

query TT
EXPLAIN select val, part from t_pushdown where part = val AND part = 'a';
----
logical_plan
01)Filter: t_pushdown.val = t_pushdown.part
02)--TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.part = Utf8("a")], partial_filters=[t_pushdown.val = t_pushdown.part]
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--FilterExec: val@0 = part@1
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet]]}, projection=[val, part], file_type=parquet

query TT
select val, part from t_pushdown where part = val AND part = 'a';
----
a a