Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties}
use datafusion::parquet::schema::types::ColumnPath;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_expr::utils::{Guarantee, LiteralGuarantee};
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningPredicateConfig};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::prelude::*;
Expand Down Expand Up @@ -301,8 +301,11 @@ impl IndexTableProvider {
// In this example, we use the PruningPredicate's literal guarantees to
// analyze the predicate. In a real system, using
// `PruningPredicate::prune` would likely be easier to do.
let pruning_predicate =
PruningPredicate::try_new(Arc::clone(predicate), self.schema())?;
let pruning_predicate = PruningPredicate::try_new(
Arc::clone(predicate),
self.schema(),
&PruningPredicateConfig::default(),
)?;

// The PruningPredicate's guarantees must all be satisfied in order for
// the predicate to possibly evaluate to true.
Expand Down
9 changes: 6 additions & 3 deletions datafusion-examples/examples/data_io/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use datafusion::parquet::arrow::{
ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder,
};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningPredicateConfig};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use std::any::Any;
Expand Down Expand Up @@ -367,8 +367,11 @@ impl ParquetMetadataIndex {
) -> Result<Vec<(&str, u64)>> {
// Use the PruningPredicate API to determine which files can not
// possibly have any relevant data.
let pruning_predicate =
PruningPredicate::try_new(predicate, self.schema().clone())?;
let pruning_predicate = PruningPredicate::try_new(
predicate,
self.schema().clone(),
&PruningPredicateConfig::default(),
)?;

// Now evaluate the pruning predicate into a boolean mask, one element per
// file in the index. If the mask is true, the file may have rows that
Expand Down
9 changes: 7 additions & 2 deletions datafusion-examples/examples/query_planning/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::common::{DFSchema, ScalarValue};
use datafusion::error::Result;
use datafusion::execution::context::ExecutionProps;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningPredicateConfig};
use datafusion::prelude::*;

/// This example shows how to use DataFusion's `PruningPredicate` to prove
Expand Down Expand Up @@ -195,7 +195,12 @@ fn create_pruning_predicate(expr: Expr, schema: &SchemaRef) -> PruningPredicate
let df_schema = DFSchema::try_from(Arc::clone(schema)).unwrap();
let props = ExecutionProps::new();
let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap();
PruningPredicate::try_new(physical_expr, Arc::clone(schema)).unwrap()
PruningPredicate::try_new(
physical_expr,
Arc::clone(schema),
&PruningPredicateConfig::default(),
)
.unwrap()
}

fn i32_array<'a>(values: impl Iterator<Item = &'a Option<i32>>) -> ArrayRef {
Expand Down
5 changes: 5 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,11 @@ config_namespace! {
/// the parquet file
pub pruning: bool, default = true

/// (reading) Maximum number of elements (inclusive) in InList exprs to be eligible for pruning.
/// When some InList exprs contain more than this threshold, these expressions are ignored during pruning,
/// but other expressions may still be used for pruning.
pub pruning_max_inlist_limit: usize, default = 20

/// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
/// the file Schema. This setting can help avoid schema conflicts when querying
/// multiple parquet files with schemas containing compatible types but different metadata
Expand Down
4 changes: 4 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ impl ParquetOptions {
// not in WriterProperties
enable_page_index: _,
pruning: _,
pruning_max_inlist_limit: _,
skip_metadata: _,
metadata_size_hint: _,
pushdown_filters: _,
Expand Down Expand Up @@ -444,6 +445,7 @@ mod tests {
// not in WriterProperties, but itemizing here to not skip newly added props
enable_page_index: defaults.enable_page_index,
pruning: defaults.pruning,
pruning_max_inlist_limit: defaults.pruning_max_inlist_limit,
skip_metadata: defaults.skip_metadata,
metadata_size_hint: defaults.metadata_size_hint,
pushdown_filters: defaults.pushdown_filters,
Expand Down Expand Up @@ -556,6 +558,8 @@ mod tests {
// not in WriterProperties
enable_page_index: global_options_defaults.enable_page_index,
pruning: global_options_defaults.pruning,
pruning_max_inlist_limit: global_options_defaults
.pruning_max_inlist_limit,
skip_metadata: global_options_defaults.skip_metadata,
metadata_size_hint: global_options_defaults.metadata_size_hint,
pushdown_filters: global_options_defaults.pushdown_filters,
Expand Down
20 changes: 18 additions & 2 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ use datafusion_physical_expr_common::physical_expr::{
use datafusion_physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics,
};
use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate};
use datafusion_pruning::{
FilePruner, PruningPredicate, PruningPredicateConfig, build_pruning_predicate,
};

use crate::sort::reverse_row_selection;
#[cfg(feature = "parquet_encryption")]
Expand Down Expand Up @@ -104,6 +106,8 @@ pub(super) struct ParquetOpener {
pub enable_bloom_filter: bool,
/// Should row group pruning be applied
pub enable_row_group_stats_pruning: bool,
/// Maximum number of elements (inclusive) in InList exprs to be eligible for pruning
pub pruning_max_inlist_limit: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could just make PruningPredicateConfig a field here instead of polluting with more fields. Also can this be pub(crate) instead of pub?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this what you had in mind ? 0f6cdeb

/// Coerce INT96 timestamps to specific TimeUnit
pub coerce_int96: Option<TimeUnit>,
/// Optional parquet FileDecryptionProperties
Expand Down Expand Up @@ -280,6 +284,9 @@ impl FileOpener for ParquetOpener {

let reverse_row_groups = self.reverse_row_groups;
let preserve_order = self.preserve_order;
let pruning_predicate_config = PruningPredicateConfig {
max_in_list: self.pruning_max_inlist_limit,
};

Ok(Box::pin(async move {
#[cfg(feature = "parquet_encryption")]
Expand Down Expand Up @@ -326,6 +333,7 @@ impl FileOpener for ParquetOpener {
&logical_file_schema,
&partitioned_file,
predicate_creation_errors.clone(),
pruning_predicate_config.clone(),
)
});

Expand Down Expand Up @@ -426,6 +434,7 @@ impl FileOpener for ParquetOpener {
predicate.as_ref(),
&physical_file_schema,
&predicate_creation_errors,
&pruning_predicate_config,
);

// The page index is not stored inline in the parquet footer so the
Expand Down Expand Up @@ -513,6 +522,7 @@ impl FileOpener for ParquetOpener {
rg_metadata,
predicate,
&file_metrics,
&pruning_predicate_config,
);
} else {
// Update metrics: statistics unavailable, so all row groups are
Expand Down Expand Up @@ -938,17 +948,20 @@ fn create_initial_plan(
pub(crate) fn build_page_pruning_predicate(
predicate: &Arc<dyn PhysicalExpr>,
file_schema: &SchemaRef,
config: &PruningPredicateConfig,
) -> Arc<PagePruningAccessPlanFilter> {
Arc::new(PagePruningAccessPlanFilter::new(
predicate,
Arc::clone(file_schema),
config,
))
}

pub(crate) fn build_pruning_predicates(
predicate: Option<&Arc<dyn PhysicalExpr>>,
file_schema: &SchemaRef,
predicate_creation_errors: &Count,
config: &PruningPredicateConfig,
) -> (
Option<Arc<PruningPredicate>>,
Option<Arc<PagePruningAccessPlanFilter>>,
Expand All @@ -960,8 +973,10 @@ pub(crate) fn build_pruning_predicates(
Arc::clone(predicate),
file_schema,
predicate_creation_errors,
config,
);
let page_pruning_predicate = build_page_pruning_predicate(predicate, file_schema);
let page_pruning_predicate =
build_page_pruning_predicate(predicate, file_schema, config);
(pruning_predicate, Some(page_pruning_predicate))
}

Expand Down Expand Up @@ -1185,6 +1200,7 @@ mod test {
enable_page_index: self.enable_page_index,
enable_bloom_filter: self.enable_bloom_filter,
enable_row_group_stats_pruning: self.enable_row_group_stats_pruning,
pruning_max_inlist_limit: 20,
coerce_int96: self.coerce_int96,
#[cfg(feature = "parquet_encryption")]
file_decryption_properties: None,
Expand Down
9 changes: 7 additions & 2 deletions datafusion/datasource-parquet/src/page_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use arrow::{
use datafusion_common::ScalarValue;
use datafusion_common::pruning::PruningStatistics;
use datafusion_physical_expr::{PhysicalExpr, split_conjunction};
use datafusion_pruning::PruningPredicate;
use datafusion_pruning::{PruningPredicate, PruningPredicateConfig};

use log::{debug, trace};
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
Expand Down Expand Up @@ -119,14 +119,19 @@ impl PagePruningAccessPlanFilter {
/// Create a new [`PagePruningAccessPlanFilter`] from a physical
/// expression.
#[expect(clippy::needless_pass_by_value)]
pub fn new(expr: &Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Self {
pub fn new(
expr: &Arc<dyn PhysicalExpr>,
schema: SchemaRef,
config: &PruningPredicateConfig,
) -> Self {
// extract any single column predicates
let predicates = split_conjunction(expr)
.into_iter()
.filter_map(|predicate| {
let pp = match PruningPredicate::try_new(
Arc::clone(predicate),
Arc::clone(&schema),
config,
) {
Ok(pp) => pp,
Err(e) => {
Expand Down
Loading
Loading