Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties}
use datafusion::parquet::schema::types::ColumnPath;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_expr::utils::{Guarantee, LiteralGuarantee};
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningPredicateConfig};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::prelude::*;
Expand Down Expand Up @@ -301,8 +301,11 @@ impl IndexTableProvider {
// In this example, we use the PruningPredicate's literal guarantees to
// analyze the predicate. In a real system, using
// `PruningPredicate::prune` would likely be easier to do.
let pruning_predicate =
PruningPredicate::try_new(Arc::clone(predicate), self.schema())?;
let pruning_predicate = PruningPredicate::try_new(
Arc::clone(predicate),
self.schema(),
&PruningPredicateConfig::default(),
)?;

// The PruningPredicate's guarantees must all be satisfied in order for
// the predicate to possibly evaluate to true.
Expand Down
9 changes: 6 additions & 3 deletions datafusion-examples/examples/data_io/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use datafusion::parquet::arrow::{
ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder,
};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningPredicateConfig};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use std::any::Any;
Expand Down Expand Up @@ -367,8 +367,11 @@ impl ParquetMetadataIndex {
) -> Result<Vec<(&str, u64)>> {
// Use the PruningPredicate API to determine which files can not
// possibly have any relevant data.
let pruning_predicate =
PruningPredicate::try_new(predicate, self.schema().clone())?;
let pruning_predicate = PruningPredicate::try_new(
predicate,
self.schema().clone(),
&PruningPredicateConfig::default(),
)?;

// Now evaluate the pruning predicate into a boolean mask, one element per
// file in the index. If the mask is true, the file may have rows that
Expand Down
9 changes: 7 additions & 2 deletions datafusion-examples/examples/query_planning/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use datafusion::common::{DFSchema, ScalarValue};
use datafusion::error::Result;
use datafusion::execution::context::ExecutionProps;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningPredicateConfig};
use datafusion::prelude::*;

/// This example shows how to use DataFusion's `PruningPredicate` to prove
Expand Down Expand Up @@ -195,7 +195,12 @@ fn create_pruning_predicate(expr: Expr, schema: &SchemaRef) -> PruningPredicate
let df_schema = DFSchema::try_from(Arc::clone(schema)).unwrap();
let props = ExecutionProps::new();
let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap();
PruningPredicate::try_new(physical_expr, Arc::clone(schema)).unwrap()
PruningPredicate::try_new(
physical_expr,
Arc::clone(schema),
&PruningPredicateConfig::default(),
)
.unwrap()
}

fn i32_array<'a>(values: impl Iterator<Item = &'a Option<i32>>) -> ArrayRef {
Expand Down
9 changes: 9 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,15 @@ config_namespace! {
/// the parquet file
pub pruning: bool, default = true

/// (reading) Maximum number of elements (inclusive) in InList exprs to be eligible for statistics pruning.
/// When some InList exprs contain more than this threshold, these expressions are ignored during statistics pruning,
/// but other expressions may still be used for pruning.
/// If an `InList` expression is not used for statistics pruning that does not mean it is ignored
/// altogether, it is still used as a filter at the data / per row level.
/// This does not impact [`ParquetOptions::pushdown_filters`], large `InList` expressions
/// are always evaluated against each row when this option is enabled.
pub pruning_max_inlist_limit: usize, default = 20

/// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
/// the file Schema. This setting can help avoid schema conflicts when querying
/// multiple parquet files with schemas containing compatible types but different metadata
Expand Down
4 changes: 4 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ impl ParquetOptions {
// not in WriterProperties
enable_page_index: _,
pruning: _,
pruning_max_inlist_limit: _,
skip_metadata: _,
metadata_size_hint: _,
pushdown_filters: _,
Expand Down Expand Up @@ -444,6 +445,7 @@ mod tests {
// not in WriterProperties, but itemizing here to not skip newly added props
enable_page_index: defaults.enable_page_index,
pruning: defaults.pruning,
pruning_max_inlist_limit: defaults.pruning_max_inlist_limit,
skip_metadata: defaults.skip_metadata,
metadata_size_hint: defaults.metadata_size_hint,
pushdown_filters: defaults.pushdown_filters,
Expand Down Expand Up @@ -556,6 +558,8 @@ mod tests {
// not in WriterProperties
enable_page_index: global_options_defaults.enable_page_index,
pruning: global_options_defaults.pruning,
pruning_max_inlist_limit: global_options_defaults
.pruning_max_inlist_limit,
skip_metadata: global_options_defaults.skip_metadata,
metadata_size_hint: global_options_defaults.metadata_size_hint,
pushdown_filters: global_options_defaults.pushdown_filters,
Expand Down
19 changes: 17 additions & 2 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ use datafusion_physical_expr_common::physical_expr::{
use datafusion_physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics,
};
use datafusion_pruning::{FilePruner, PruningPredicate, build_pruning_predicate};
use datafusion_pruning::{
FilePruner, PruningPredicate, PruningPredicateConfig, build_pruning_predicate,
};

use crate::sort::reverse_row_selection;
#[cfg(feature = "parquet_encryption")]
Expand Down Expand Up @@ -104,6 +106,8 @@ pub(super) struct ParquetOpener {
pub enable_bloom_filter: bool,
/// Should row group pruning be applied
pub enable_row_group_stats_pruning: bool,
/// Internals configuration of predicate pruning
pub(crate) pruning_predicate_config: PruningPredicateConfig,
/// Coerce INT96 timestamps to specific TimeUnit
pub coerce_int96: Option<TimeUnit>,
/// Optional parquet FileDecryptionProperties
Expand Down Expand Up @@ -280,6 +284,7 @@ impl FileOpener for ParquetOpener {

let reverse_row_groups = self.reverse_row_groups;
let preserve_order = self.preserve_order;
let pruning_predicate_config = self.pruning_predicate_config.clone();

Ok(Box::pin(async move {
#[cfg(feature = "parquet_encryption")]
Expand Down Expand Up @@ -326,6 +331,7 @@ impl FileOpener for ParquetOpener {
&logical_file_schema,
&partitioned_file,
predicate_creation_errors.clone(),
pruning_predicate_config.clone(),
)
});

Expand Down Expand Up @@ -426,6 +432,7 @@ impl FileOpener for ParquetOpener {
predicate.as_ref(),
&physical_file_schema,
&predicate_creation_errors,
&pruning_predicate_config,
);

// The page index is not stored inline in the parquet footer so the
Expand Down Expand Up @@ -513,6 +520,7 @@ impl FileOpener for ParquetOpener {
rg_metadata,
predicate,
&file_metrics,
&pruning_predicate_config,
);
} else {
// Update metrics: statistics unavailable, so all row groups are
Expand Down Expand Up @@ -938,17 +946,20 @@ fn create_initial_plan(
pub(crate) fn build_page_pruning_predicate(
predicate: &Arc<dyn PhysicalExpr>,
file_schema: &SchemaRef,
config: &PruningPredicateConfig,
) -> Arc<PagePruningAccessPlanFilter> {
Arc::new(PagePruningAccessPlanFilter::new(
predicate,
Arc::clone(file_schema),
config,
))
}

pub(crate) fn build_pruning_predicates(
predicate: Option<&Arc<dyn PhysicalExpr>>,
file_schema: &SchemaRef,
predicate_creation_errors: &Count,
config: &PruningPredicateConfig,
) -> (
Option<Arc<PruningPredicate>>,
Option<Arc<PagePruningAccessPlanFilter>>,
Expand All @@ -960,8 +971,10 @@ pub(crate) fn build_pruning_predicates(
Arc::clone(predicate),
file_schema,
predicate_creation_errors,
config,
);
let page_pruning_predicate = build_page_pruning_predicate(predicate, file_schema);
let page_pruning_predicate =
build_page_pruning_predicate(predicate, file_schema, config);
(pruning_predicate, Some(page_pruning_predicate))
}

Expand Down Expand Up @@ -1032,6 +1045,7 @@ mod test {
DefaultPhysicalExprAdapterFactory, replace_columns_with_literals,
};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_pruning::PruningPredicateConfig;
use futures::{Stream, StreamExt};
use object_store::{ObjectStore, memory::InMemory, path::Path};
use parquet::arrow::ArrowWriter;
Expand Down Expand Up @@ -1185,6 +1199,7 @@ mod test {
enable_page_index: self.enable_page_index,
enable_bloom_filter: self.enable_bloom_filter,
enable_row_group_stats_pruning: self.enable_row_group_stats_pruning,
pruning_predicate_config: PruningPredicateConfig::default(),
coerce_int96: self.coerce_int96,
#[cfg(feature = "parquet_encryption")]
file_decryption_properties: None,
Expand Down
9 changes: 7 additions & 2 deletions datafusion/datasource-parquet/src/page_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use arrow::{
use datafusion_common::ScalarValue;
use datafusion_common::pruning::PruningStatistics;
use datafusion_physical_expr::{PhysicalExpr, split_conjunction};
use datafusion_pruning::PruningPredicate;
use datafusion_pruning::{PruningPredicate, PruningPredicateConfig};

use log::{debug, trace};
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
Expand Down Expand Up @@ -119,14 +119,19 @@ impl PagePruningAccessPlanFilter {
/// Create a new [`PagePruningAccessPlanFilter`] from a physical
/// expression.
#[expect(clippy::needless_pass_by_value)]
pub fn new(expr: &Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Self {
pub fn new(
expr: &Arc<dyn PhysicalExpr>,
schema: SchemaRef,
config: &PruningPredicateConfig,
) -> Self {
// extract any single column predicates
let predicates = split_conjunction(expr)
.into_iter()
.filter_map(|predicate| {
let pp = match PruningPredicate::try_new(
Arc::clone(predicate),
Arc::clone(&schema),
config,
) {
Ok(pp) => pp,
Err(e) => {
Expand Down
Loading