Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ impl DynamicFilterPhysicalExpr {
///
/// This method will return when [`Self::update`] is called and the generation increases.
/// It does not guarantee that the filter is complete.
///
/// Producers (e.g.) HashJoinExec may never update the expression or mark it as completed if there are no consumers.
/// If you call this method on a dynamic filter created by such a producer and there are no consumers registered this method would wait indefinitely.
/// This should not happen under normal operation and would indicate a programming error either in your producer or in DataFusion if the producer is a built in node.
pub async fn wait_update(&self) {
let mut rx = self.state_watch.subscribe();
// Get the current generation
Expand All @@ -287,17 +291,16 @@ impl DynamicFilterPhysicalExpr {

/// Wait asynchronously until this dynamic filter is marked as complete.
///
/// This method returns immediately if the filter is already complete or if the filter
/// is not being used by any consumers.
/// This method returns immediately if the filter is already complete.
/// Otherwise, it waits until [`Self::mark_complete`] is called.
///
/// Unlike [`Self::wait_update`], this method guarantees that when it returns,
/// the filter is fully complete with no more updates expected.
pub async fn wait_complete(self: &Arc<Self>) {
if !self.is_used() {
return;
}

///
/// Producers (e.g.) HashJoinExec may never update the expression or mark it as completed if there are no consumers.
/// If you call this method on a dynamic filter created by such a producer and there are no consumers registered this method would wait indefinitely.
/// This should not happen under normal operation and would indicate a programming error either in your producer or in DataFusion if the producer is a built in node.
pub async fn wait_complete(&self) {
if self.inner.read().is_complete {
return;
}
Expand Down
10 changes: 0 additions & 10 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5078,11 +5078,6 @@ mod tests {
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);

// Simulate a consumer by creating a transformed copy (what happens during filter pushdown)
let _consumer = Arc::clone(&dynamic_filter)
.with_new_children(vec![])
.unwrap();

// Create HashJoinExec with the dynamic filter
let mut join = HashJoinExec::try_new(
left,
Expand Down Expand Up @@ -5132,11 +5127,6 @@ mod tests {
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);

// Simulate a consumer by creating a transformed copy (what happens during filter pushdown)
let _consumer = Arc::clone(&dynamic_filter)
.with_new_children(vec![])
.unwrap();

// Create HashJoinExec with the dynamic filter
let mut join = HashJoinExec::try_new(
left,
Expand Down