Skip to content

POC: Parse to Merge Logical Plan #15862

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

jonathanc-n
Copy link
Contributor

Which issue does this PR close?

part of #13385

Rationale for this change

Adds AST -> MERGE logical plan.

What changes are included in this PR?

The rationale is that the Merge node will emit a Join plan and case expressions. Both sides of the join will emit a column of null/not nulls to be able to match/not match on each row.

@jonathanc-n jonathanc-n marked this pull request as draft April 26, 2025 00:04
@github-actions github-actions bot added sql SQL Planner logical-expr Logical plan and expressions labels Apr 26, 2025
@jonathanc-n jonathanc-n changed the title POC: AST -> Merge Logical Plan POC: Parse to Merge Logical Plan Apr 26, 2025
let merged = LogicalPlanBuilder::from(join_plan)
.filter(delete_pred.not())?
.project(vec![
Expr::Case(Case {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better way to do this, I have not implemented it yet however using a case expression like this would need a case expression for every column that is to be updated. I think it would be better to somehow do the full update to the entire row at once. Is there an expression for this?

let target_src = self.context_provider.get_table_source(target_ref.clone())?;
let target_scan =
LogicalPlanBuilder::scan(target_ref.clone(), Arc::clone(&target_src), None)?
.project(projected_columns, lit(true).alias("target_exists")])? // add flag for matching target
Copy link
Contributor Author

@jonathanc-n jonathanc-n Apr 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an earlier commit, i just put the a projected column here as a placeholder, I will update the pr later.

@jonathanc-n
Copy link
Contributor Author

PTAL @jayzhan211 @universalmind303

@@ -241,3 +241,10 @@ fn make_count_schema() -> DFSchemaRef {
.unwrap(),
)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I missing something or are not using this struct in you code?

Generally I would argue that this struct should be more general, essentially a conversion from the AST representation into Datafusion concepts. Without performing the actual MERGE logic. This would mean converting the TableFactors into LogicalPlan::Scan and the MergeClauses into a Datafusion structs containing the expressions.

When keeping the struct more general, the actual MERGE logic can then be performed when planning the physical plan. Since Datafusion natively doesn't support UPDATEs and DELETEs, this leaves more room for extensions to provide this functionality.

But that's just my point of view.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I didn't fully complete the pull request yet. Currently, the implementation is converting the table factors into a scan that are then combined into a join; the extra flag columns are used to perform matching later on when branching for the INSERTs, UPDATEs, and DELETEs. Do you think just passing along a Scan instead of fully converting into a join would be better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the join makes sense. You'll probably always need to perform it.

I think one issue we're going to run into is that you will need to somehow fork the join node. You will need a stream of record batches for the matching flags and then you will need to reuse the join node to get a stream of record batches for the not matching flags.
This is currently not well supported with datafusions execution model. When you currently use the results of a node you can't reuse them anywhere else.

The case expression is a bit unclear to me. But maybe I just have to try to understand it better. I thought it would be easier to keep the match clauses separate. So that the later implementation can handle them more easily.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
logical-expr Logical plan and expressions sql SQL Planner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants