-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Multi-stage] Support lookup join #13966
Conversation
3d1beea
to
31df207
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #13966 +/- ##
============================================
+ Coverage 61.75% 63.81% +2.06%
- Complexity 207 1532 +1325
============================================
Files 2436 2622 +186
Lines 133233 144260 +11027
Branches 20636 22069 +1433
============================================
+ Hits 82274 92062 +9788
- Misses 44911 45407 +496
- Partials 6048 6791 +743
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
e272c60
to
04493c3
Compare
if (PinotHintOptions.JoinHintOptions.LOOKUP_JOIN_STRATEGY.equals(joinStrategy)) { | ||
// Lookup join | ||
Preconditions.checkArgument(!joinInfo.leftKeys.isEmpty(), "Lookup join requires join keys"); | ||
newLeftInput = PinotLogicalExchange.create(leftInput, RelDistributions.hash(joinInfo.leftKeys)); | ||
// Right table should be a dimension table, and the right input should be an identifier only ProjectNode over | ||
// TableScanNode. | ||
Preconditions.checkState(rightInput instanceof Project, "Right input for lookup join must be a Project, got: %s", | ||
rightInput.getClass().getSimpleName()); | ||
Project project = (Project) rightInput; | ||
for (RexNode node : project.getProjects()) { | ||
Preconditions.checkState(node instanceof RexInputRef, | ||
"Right input for lookup join must be an identifier (RexInputRef) only Project, got: %s in project", | ||
node.getClass().getSimpleName()); | ||
} | ||
RelNode projectInput = PinotRuleUtils.unboxRel(project.getInput()); | ||
Preconditions.checkState(projectInput instanceof TableScan, | ||
"Right input for lookup join must be a Project over TableScan, got Project over: %s", | ||
projectInput.getClass().getSimpleName()); | ||
newRightInput = rightInput; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe not needed for this PR, but in the future I think we should need to start moving logic to new rules. For example, we can have a rule that only applies when the hint is enabled and the right hand side is a project and (...all conditions). If the rule applies, we change the node to a DimJoin + exchanges.
? ((HepRelVertex) join.getLeft()).getCurrentRel() : join.getLeft()); | ||
PinotLogicalExchange right = (PinotLogicalExchange) (join.getRight() instanceof HepRelVertex | ||
? ((HepRelVertex) join.getRight()).getCurrentRel() : join.getRight()); | ||
PinotLogicalExchange left = (PinotLogicalExchange) PinotRuleUtils.unboxRel(join.getLeft()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for future PRs: I think we can change PinotRuleUtils.unboxRel
to return T extends PinotLogicalExcange
. So we don't need to add the cast every time
// Validation | ||
JoinRelType joinType = node.getJoinType(); | ||
int numLeftColumns = leftSchema.size(); | ||
int numResultColumns = node.getDataSchema().size(); | ||
if (joinType.projectsRight()) { | ||
int numRightColumns = right.getDataSchema().size(); | ||
Preconditions.checkState(numLeftColumns + numRightColumns == numResultColumns, | ||
"Invalid number of columns for join type: %s, left: %s, right: %s, result: %s", joinType, numLeftColumns, | ||
numRightColumns, numResultColumns); | ||
} else { | ||
Preconditions.checkState(numLeftColumns == numResultColumns, | ||
"Invalid number of columns for join type: %s, left: %s, result: %s", joinType, numLeftColumns, | ||
numResultColumns); | ||
} | ||
|
||
PlanNode.NodeHint nodeHint = node.getNodeHint(); | ||
String joinStrategy = null; | ||
Map<String, String> joinHints = nodeHint.getHintOptions().get(PinotHintOptions.JOIN_HINT_OPTIONS); | ||
if (joinHints != null) { | ||
joinStrategy = joinHints.get(PinotHintOptions.JoinHintOptions.JOIN_STRATEGY); | ||
} | ||
if (PinotHintOptions.JoinHintOptions.LOOKUP_JOIN_STRATEGY.equals(joinStrategy)) { | ||
return new LookupJoinOperator(context, leftOperator, rightOperator, node); | ||
} else { | ||
return new HashJoinOperator(context, leftOperator, leftSchema, rightOperator, node); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not blocker, but I would prefer to have another Calcite operator for lookup joins
if (right instanceof MailboxReceiveNode | ||
&& ((MailboxReceiveNode) right).getExchangeType() == PinotRelExchangeType.PIPELINE_BREAKER) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. Don't you think we are repeating checks all the way down to the physical plan? Instead we should decide the type of join we will use at broker level and just blindly transform the calcite PinotPipelineBreakerJoin, PinotHashJoin or PinotLookupJoin into the physical operator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, not blocker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you suggesting using our customized Rel
over the calcite LogicalJoin
so that we can differentiate different join types? Then we can add different JoinNode
accordingly for the ser/de?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Short answer: Yes.
Long answer:
Calcite itself expects that. The root idea in Calcite is that rules should optimize logical rules (ie pushing filters into joins), apply distribution, etc and then some final rules will transform logical rules (ie LogicalJoin) into specific joins. Calcite for example includes Enumerable operators that implement most logical operators. For example, EnumerableMergeJoin implements a nested loop join while EnumerableHashJoin implements a hash join. EnumerableJoinRule can be used to decide which one should be used. We wouldn't use EnumerableJoinRule, instead we should have our own rule that decides whether to use hash join, semi join, lookup join... but even more advanced joins like ones that merge a join+limit or join+aggregate.
For example imagine a tree like:
Aggregate (count by A.col1)
Join
select A
select B
Right now Join emits (and allocate) a lot of rows just to be aggregated by its parent. It would be more efficient to count at the same time we build the blocks. Obviously we are not going to apply this kind of optimizations in the short term, but in the medium/large will be very effective and at the same time would be very error prone to repeat the logic that creates these optimizations in both Calcite (to prioritize the plans that can be optimized) and then in ServerPlanRequestVisitor.
Instead the Calcite's way should be to generate the AggregateJoinRel node and then we should be able to blindly generate the executable Pinot Operator whenever a AggregateJoinRel is received, without having to check conditions again (because we assume a friendly Broker that doesn't generate incorrect plans).
About serialization:
Due to our own decisions we decided to add an extra layer of JoinNodes (which I don't think they are necessary) and a layer of GRPC (which makes more sense, but we could just remove the JoinNode layer and transform Calcite operators directly into GRPC). We could also use the JSON representation of Calcite, but AFAIR we decided to use GRPC to do not depend on Calcite breaking backward compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added JoinStrategy
enum into the JoinNode
. It is quite hard to change the Calcite LogicalJoin
directly without breaking backward compatibility, so left that as a follow up
@@ -1,6 +1,7 @@ | |||
{ | |||
"tableName": "userGroups", | |||
"tableType": "OFFLINE", | |||
"isDimTable": true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given we also use colocated quickstart to test colocated joins, I think we should have another table like userGroups that is not a dim table.
This is not an actual problem right now that both colocated and dim joins have to be enabled explicitly with hints, but will be an issue in the future when both are going to be applied by default.
* A problem in the sense that we will need to specify hints to use one or the other mode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As said a couple of times during the review, I would prefer to move the fact that we have a LookupJoin into Calcite planning phase instead of having to decide whether we use one or the other at both calcite level (generating different exchanges) and then in the server (when the physical plan is generated).
Anyway that is something we can discuss about in the future. For now the current solution is good enough for me.
I think it would be also cool to have a pinot property and a query option we can set to enable this feature by default. I'm already doing that in #13943
dfc80c2
to
b9b7561
Compare
b9b7561
to
c5a4efa
Compare
c5a4efa
to
40d5724
Compare
We need to document this new lookup mode. I already have this open PR in gitbook. It would be great if you could add a paragraph explaining this one and then merge it. |
@gortiz Good point. Seems I cannot directly modify the PR, so I merged it and I can add a new paragraph separately |
Please remember to document this lookup ;) |
lookup
join strategy as a hint (e.g./*+ joinOptions(join_strategy='lookup') */
)