Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,12 @@ impl DynamicFilterPhysicalExpr {
///
/// This method will return when [`Self::update`] is called and the generation increases.
/// It does not guarantee that the filter is complete.
///
/// # Note
///
/// This method should only be called on filters that have consumers. If you don't
/// know whether the filter is being used, call [`Self::is_used`] first to avoid
/// waiting indefinitely.
pub async fn wait_update(&self) {
let mut rx = self.state_watch.subscribe();
// Get the current generation
Expand All @@ -287,17 +293,18 @@ impl DynamicFilterPhysicalExpr {

/// Wait asynchronously until this dynamic filter is marked as complete.
///
/// This method returns immediately if the filter is already complete or if the filter
/// is not being used by any consumers.
/// This method returns immediately if the filter is already complete.
/// Otherwise, it waits until [`Self::mark_complete`] is called.
///
/// Unlike [`Self::wait_update`], this method guarantees that when it returns,
/// the filter is fully complete with no more updates expected.
pub async fn wait_complete(self: &Arc<Self>) {
if !self.is_used() {
return;
}

///
/// # Note
///
/// This method should only be called on filters that have consumers. If you don't
/// know whether the filter is being used, call [`Self::is_used`] first to avoid
/// waiting indefinitely.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a bit of nuance here. This is not because of the DynamicFilterPhysicalExpr API itself, it's only because of how HashJoinExec is implemented.

Under normal operation it would be a consumer calling wait_complete and hence it knows that a consumer exists because it is a consumer. In other words, under normal operation wait_completed is only called by consumers and thus is_used would always be true.

Or put another way, the only way this could go wrong is e.g. in a test or if HashJoinExec itself called wait_complete. By definition if more than 1 thing has a reference to the dynamic filter, there is a consumer. If there is only 1 reference, it must be the one HashJoinExec has (outside of tests). So it would have to be HashJoinExec that is calling wait_complete() right?

So the scenario described here seems more like a programming error or misuse of the APIs, not something that could happen under normal operation of a bug free usage of these APIs, right? In other words: if I was implementing this I could probably put the is_used() check behind #[cfg(debug_assertions)] or something to catch a programming error on my end, but it wouldn't really make sense to have that check at runtime in production, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under normal operation it would be a consumer calling wait_complete and hence it knows that a consumer exists because it is a consumer. In other words, under normal operation wait_completed is only called by consumers and thus is_used would always be true.

🤔 I agree -- when I implemented this API, I had in mind that it would be used mainly in custom probe nodes of a HashJoinExec, so yes, ideally wait_complete is always called by consumers (in those cases it's impossible for wait_complete to wait indefinitely).

However, I remember when I added is_used in HashJoinExec::execute(), I saw in the tests we were waiting indefinitely (but this was mainly because before, is_used only checked the inner struct, which is not the case anymore), which is why I then added is_used inside wait_complete.

I included this note mainly thinking about a scenario where some third-party node has a reference to the DynamicFilter of the HashJoin but doesn't know if it has a consumer or not. However, in that case, the third-party node would hold a reference and is_used would return true, then filters would be computed and wait_complete would return successfully.

So yes, if this happens, it would be a programming error. I will remove the comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just make it clear that this scenario would only result from a programming error 😄

pub async fn wait_complete(&self) {
if self.inner.read().is_complete {
return;
}
Expand Down
10 changes: 0 additions & 10 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5078,11 +5078,6 @@ mod tests {
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);

// Simulate a consumer by creating a transformed copy (what happens during filter pushdown)
let _consumer = Arc::clone(&dynamic_filter)
.with_new_children(vec![])
.unwrap();

// Create HashJoinExec with the dynamic filter
let mut join = HashJoinExec::try_new(
left,
Expand Down Expand Up @@ -5132,11 +5127,6 @@ mod tests {
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);

// Simulate a consumer by creating a transformed copy (what happens during filter pushdown)
let _consumer = Arc::clone(&dynamic_filter)
.with_new_children(vec![])
.unwrap();

// Create HashJoinExec with the dynamic filter
let mut join = HashJoinExec::try_new(
left,
Expand Down