Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,18 @@ config_namespace! {
/// See: <https://trino.io/docs/current/admin/dynamic-filtering.html#dynamic-filter-collection-thresholds>
pub hash_join_inlist_pushdown_max_distinct_values: usize, default = 150

/// When true, pushes down hash table references for membership checks in hash joins
/// when the build side is too large for InList pushdown.
/// When false, no membership filter is created when InList thresholds are exceeded.
Comment on lines +1118 to +1120
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation states "When false, no membership filter is created when InList thresholds are exceeded." This could be clearer. Consider rephrasing to: "When false, no hash table reference (Map) is used for membership checks. If the build side exceeds InList thresholds, no membership filter is created (only bounds, if enabled)." This makes it clearer that InList filters can still be created when the build side is small enough, and that this config specifically controls the Map fallback behavior.

Suggested change
/// When true, pushes down hash table references for membership checks in hash joins
/// when the build side is too large for InList pushdown.
/// When false, no membership filter is created when InList thresholds are exceeded.
/// When true, pushes down hash table references (Map) for membership checks in hash joins
/// when the build side is too large for InList pushdown.
/// When false, no hash table reference (Map) is used for membership checks. If the build
/// side exceeds InList thresholds, no membership filter is created (only bounds, if enabled).

Copilot uses AI. Check for mistakes.
/// Default: true
pub hash_join_map_pushdown: bool, default = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it is also good to differentiate between array map (contains_keys) vs hash map (contains_hashes). The first one should be much cheaper (no hashing / hash probing necessary).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(i.e. the one introduced in #19411)


/// When true, pushes down min/max bounds for join key columns.
/// This enables statistics-based pruning (e.g., Parquet row group skipping).
/// When false, only membership filters (InList or Map) are pushed down.
/// Default: true
pub hash_join_bounds_pushdown: bool, default = true
Comment on lines +1122 to +1128
Copy link

Copilot AI Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new configuration options hash_join_map_pushdown and hash_join_bounds_pushdown lack test coverage. While the PR description mentions existing hash join tests pass, there are no specific tests that validate the behavior when these options are set to false. Consider adding tests similar to the existing test_hashjoin_hash_table_pushdown_partitioned that verify:

  1. When hash_join_map_pushdown is false and build side exceeds InList thresholds, no membership filter is created (only bounds if enabled)
  2. When hash_join_bounds_pushdown is false, bounds predicates are not included in the filter
  3. When both are false but dynamic filter pushdown is enabled, appropriate behavior occurs
  4. Various combinations of these flags work correctly in both Partitioned and CollectLeft modes

Copilot uses AI. Check for mistakes.

/// The default filter selectivity used by Filter Statistics
/// when an exact selectivity cannot be determined. Valid values are
/// between 0 (no selectivity) and 100 (all rows are selected).
Expand Down
19 changes: 16 additions & 3 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,11 @@ impl ExecutionPlan for HashJoinExec {
// Initialize build_accumulator lazily with runtime partition counts (only if enabled)
// Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing
let repartition_random_state = REPARTITION_RANDOM_STATE;
let bounds_pushdown_enabled = context
.session_config()
.options()
.optimizer
.hash_join_bounds_pushdown;
let build_accumulator = enable_dynamic_filter_pushdown
.then(|| {
self.dynamic_filter.as_ref().map(|df| {
Expand All @@ -1169,6 +1174,7 @@ impl ExecutionPlan for HashJoinExec {
filter,
on_right,
repartition_random_state,
bounds_pushdown_enabled,
))
})))
})
Expand Down Expand Up @@ -1702,7 +1708,8 @@ async fn collect_left_input(
PushdownStrategy::Empty
} else {
// If the build side is small enough we can use IN list pushdown.
// If it's too big we fall back to pushing down a reference to the hash table.
// If it's too big we fall back to pushing down a reference to the hash table
// (if map pushdown is enabled).
// See `PushdownStrategy` for more details.
let estimated_size = left_values
.iter()
Expand All @@ -1716,11 +1723,17 @@ async fn collect_left_input(
.optimizer
.hash_join_inlist_pushdown_max_distinct_values
{
PushdownStrategy::Map(Arc::clone(&map))
if config.optimizer.hash_join_map_pushdown {
PushdownStrategy::Map(Arc::clone(&map))
} else {
PushdownStrategy::Disabled
}
} else if let Some(in_list_values) = build_struct_inlist_values(&left_values)? {
PushdownStrategy::InList(in_list_values)
} else {
} else if config.optimizer.hash_join_map_pushdown {
PushdownStrategy::Map(Arc::clone(&map))
} else {
PushdownStrategy::Disabled
}
};

Expand Down
39 changes: 29 additions & 10 deletions datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ fn create_membership_predicate(
)) as Arc<dyn PhysicalExpr>)),
// Empty partition - should not create a filter for this
PushdownStrategy::Empty => Ok(None),
// User disabled map pushdown - only bounds (if enabled) will be used
PushdownStrategy::Disabled => Ok(None),
}
}

Expand Down Expand Up @@ -226,6 +228,8 @@ pub(crate) struct SharedBuildAccumulator {
repartition_random_state: SeededRandomState,
/// Schema of the probe (right) side for evaluating filter expressions
probe_schema: Arc<Schema>,
/// Whether to include bounds in the dynamic filter
bounds_pushdown_enabled: bool,
}

/// Strategy for filter pushdown (decided at collection time)
Expand All @@ -237,6 +241,8 @@ pub(crate) enum PushdownStrategy {
Map(Arc<Map>),
/// There was no data in this partition, do not build a dynamic filter for it
Empty,
/// User disabled map pushdown via config, only use bounds (if enabled)
Disabled,
}

/// Build-side data reported by a single partition
Expand Down Expand Up @@ -302,6 +308,7 @@ impl SharedBuildAccumulator {
dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
on_right: Vec<PhysicalExprRef>,
repartition_random_state: SeededRandomState,
bounds_pushdown_enabled: bool,
) -> Self {
// Troubleshooting: If partition counts are incorrect, verify this logic matches
// the actual execution pattern in collect_build_side()
Expand Down Expand Up @@ -342,6 +349,7 @@ impl SharedBuildAccumulator {
on_right,
repartition_random_state,
probe_schema: right_child.schema(),
bounds_pushdown_enabled,
}
}

Expand Down Expand Up @@ -410,11 +418,15 @@ impl SharedBuildAccumulator {
self.probe_schema.as_ref(),
)?;

// Create bounds check expression (if bounds available)
let bounds_expr = create_bounds_predicate(
&self.on_right,
&partition_data.bounds,
);
// Create bounds check expression (if bounds available and enabled)
let bounds_expr = if self.bounds_pushdown_enabled {
create_bounds_predicate(
&self.on_right,
&partition_data.bounds,
)
} else {
None
};

// Combine membership and bounds expressions for multi-layer optimization:
// - Bounds (min/max): Enable statistics-based pruning (Parquet row group/file skipping)
Expand Down Expand Up @@ -490,6 +502,7 @@ impl SharedBuildAccumulator {
as Arc<dyn PhysicalExpr>;

// Create WHEN branches for each partition
let bounds_enabled = self.bounds_pushdown_enabled;
let when_then_branches: Vec<(
Arc<dyn PhysicalExpr>,
Arc<dyn PhysicalExpr>,
Expand All @@ -499,8 +512,10 @@ impl SharedBuildAccumulator {
.filter_map(|(partition_id, partition_opt)| {
partition_opt.as_ref().and_then(|partition| {
// Skip empty partitions - they would always return false anyway
// Skip disabled partitions when bounds are also disabled - no filter to create
match &partition.pushdown {
PushdownStrategy::Empty => None,
PushdownStrategy::Disabled if !bounds_enabled => None,
_ => Some((partition_id, partition)),
}
})
Expand All @@ -520,11 +535,15 @@ impl SharedBuildAccumulator {
self.probe_schema.as_ref(),
)?;

// 2. Create bounds check expression for this partition (if bounds available)
let bounds_expr = create_bounds_predicate(
&self.on_right,
&partition.bounds,
);
// 2. Create bounds check expression for this partition (if bounds available and enabled)
let bounds_expr = if self.bounds_pushdown_enabled {
create_bounds_predicate(
&self.on_right,
&partition.bounds,
)
} else {
None
};

// 3. Combine membership and bounds expressions
let then_expr = match (membership_expr, bounds_expr) {
Expand Down
Loading