Skip to content

Commit 3d1beea

Browse files
committed
[Multi-stage] Support lookup join
1 parent c10efb6 commit 3d1beea

File tree

11 files changed

+452
-132
lines changed

11 files changed

+452
-132
lines changed

pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintOptions.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,27 @@ public static class WindowHintOptions {
6161

6262
public static class JoinHintOptions {
6363
public static final String JOIN_STRATEGY = "join_strategy";
64-
public static final String DYNAMIC_BROADCAST_JOIN_STRATEGY = "dynamic_broadcast";
64+
// "hash_table" is the default strategy for non-SEMI joins
6565
public static final String HASH_TABLE_JOIN_STRATEGY = "hash_table";
66+
// "dynamic_broadcast" is the default strategy for SEMI joins
67+
public static final String DYNAMIC_BROADCAST_JOIN_STRATEGY = "dynamic_broadcast";
68+
// "lookup" can be used when the right table is a dimension table replicated to all workers
69+
public static final String LOOKUP_JOIN_STRATEGY = "lookup";
70+
6671
/**
6772
* Max rows allowed to build the right table hash collection.
6873
*/
6974
public static final String MAX_ROWS_IN_JOIN = "max_rows_in_join";
75+
7076
/**
7177
* Mode when join overflow happens, supported values: THROW or BREAK.
7278
* THROW(default): Break right table build process, and throw exception, no JOIN with left table performed.
7379
* BREAK: Break right table build process, continue to perform JOIN operation, results might be partial.
7480
*/
7581
public static final String JOIN_OVERFLOW_MODE = "join_overflow_mode";
82+
7683
/**
77-
* Indicat that the join operator(s) within a certain selection scope are colocated
84+
* Indicates that the join operator(s) within a certain selection scope are colocated
7885
*/
7986
public static final String IS_COLOCATED_BY_JOIN_KEYS = "is_colocated_by_join_keys";
8087
}

pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,22 @@
1818
*/
1919
package org.apache.pinot.calcite.rel.rules;
2020

21+
import com.google.common.base.Preconditions;
2122
import com.google.common.collect.ImmutableList;
2223
import org.apache.calcite.plan.RelOptRule;
2324
import org.apache.calcite.plan.RelOptRuleCall;
2425
import org.apache.calcite.rel.RelDistributions;
2526
import org.apache.calcite.rel.RelNode;
2627
import org.apache.calcite.rel.core.Join;
2728
import org.apache.calcite.rel.core.JoinInfo;
29+
import org.apache.calcite.rel.core.Project;
30+
import org.apache.calcite.rel.core.TableScan;
2831
import org.apache.calcite.rel.logical.LogicalJoin;
32+
import org.apache.calcite.rex.RexInputRef;
33+
import org.apache.calcite.rex.RexNode;
2934
import org.apache.calcite.tools.RelBuilderFactory;
35+
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
36+
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
3037
import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
3138

3239

@@ -56,27 +63,49 @@ public boolean matches(RelOptRuleCall call) {
5663
@Override
5764
public void onMatch(RelOptRuleCall call) {
5865
Join join = call.rel(0);
59-
RelNode leftInput = join.getInput(0);
60-
RelNode rightInput = join.getInput(1);
61-
62-
RelNode leftExchange;
63-
RelNode rightExchange;
66+
RelNode leftInput = PinotRuleUtils.unboxRel(join.getInput(0));
67+
RelNode rightInput = PinotRuleUtils.unboxRel(join.getInput(1));
6468
JoinInfo joinInfo = join.analyzeCondition();
69+
String joinStrategy = PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
70+
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
6571

66-
if (joinInfo.leftKeys.isEmpty()) {
67-
// when there's no JOIN key, use broadcast.
68-
leftExchange = PinotLogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
69-
rightExchange = PinotLogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);
72+
RelNode newLeftInput;
73+
RelNode newRightInput;
74+
if (PinotHintOptions.JoinHintOptions.LOOKUP_JOIN_STRATEGY.equals(joinStrategy)) {
75+
// Lookup join
76+
Preconditions.checkArgument(!joinInfo.leftKeys.isEmpty(), "Lookup join requires join keys");
77+
newLeftInput = PinotLogicalExchange.create(leftInput, RelDistributions.hash(joinInfo.leftKeys));
78+
// Right table should be a dimension table, and the right input should be an identifier only ProjectNode over
79+
// TableScanNode.
80+
Preconditions.checkState(rightInput instanceof Project, "Right input for lookup join must be a Project, got: %s",
81+
rightInput.getClass().getSimpleName());
82+
Project project = (Project) rightInput;
83+
for (RexNode node : project.getProjects()) {
84+
Preconditions.checkState(node instanceof RexInputRef,
85+
"Right input for lookup join must be an identifier (RexInputRef) only Project, got: %s in project",
86+
node.getClass().getSimpleName());
87+
}
88+
RelNode projectInput = PinotRuleUtils.unboxRel(project.getInput());
89+
Preconditions.checkState(projectInput instanceof TableScan,
90+
"Right input for lookup join must be a Project over TableScan, got Project over: %s",
91+
projectInput.getClass().getSimpleName());
92+
newRightInput = rightInput;
7093
} else {
71-
// when join key exists, use hash distribution.
72-
leftExchange = PinotLogicalExchange.create(leftInput, RelDistributions.hash(joinInfo.leftKeys));
73-
rightExchange = PinotLogicalExchange.create(rightInput, RelDistributions.hash(joinInfo.rightKeys));
94+
// Regular join
95+
if (joinInfo.leftKeys.isEmpty()) {
96+
// Broadcast the right table if there is no join key
97+
newLeftInput = PinotLogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
98+
newRightInput = PinotLogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);
99+
} else {
100+
// Use hash join when there are join keys
101+
newLeftInput = PinotLogicalExchange.create(leftInput, RelDistributions.hash(joinInfo.leftKeys));
102+
newRightInput = PinotLogicalExchange.create(rightInput, RelDistributions.hash(joinInfo.rightKeys));
103+
}
74104
}
75105

76-
RelNode newJoinNode =
77-
new LogicalJoin(join.getCluster(), join.getTraitSet(), join.getHints(), leftExchange, rightExchange,
106+
call.transformTo(
107+
new LogicalJoin(join.getCluster(), join.getTraitSet(), join.getHints(), newLeftInput, newRightInput,
78108
join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(),
79-
ImmutableList.copyOf(join.getSystemFieldList()));
80-
call.transformTo(newJoinNode);
109+
ImmutableList.copyOf(join.getSystemFieldList())));
81110
}
82111
}

pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java

Lines changed: 28 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,8 @@
1919
package org.apache.pinot.calcite.rel.rules;
2020

2121
import com.google.common.collect.ImmutableList;
22-
import java.util.Collections;
23-
import java.util.List;
2422
import org.apache.calcite.plan.RelOptRule;
2523
import org.apache.calcite.plan.RelOptRuleCall;
26-
import org.apache.calcite.plan.hep.HepRelVertex;
2724
import org.apache.calcite.rel.RelDistribution;
2825
import org.apache.calcite.rel.RelDistributions;
2926
import org.apache.calcite.rel.RelNode;
@@ -37,7 +34,6 @@
3734
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
3835
import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
3936
import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
40-
import org.apache.zookeeper.common.StringUtils;
4137

4238

4339
/**
@@ -129,51 +125,46 @@ public boolean matches(RelOptRuleCall call) {
129125
if (call.rels.length < 1 || !(call.rel(0) instanceof Join)) {
130126
return false;
131127
}
128+
129+
// Do not apply this rule if join strategy is explicitly set to something other than dynamic broadcast
132130
Join join = call.rel(0);
133-
String joinStrategyString = PinotHintStrategyTable.getHintOption(join.getHints(),
134-
PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
135-
List<String> joinStrategies = joinStrategyString != null ? StringUtils.split(joinStrategyString, ",")
136-
: Collections.emptyList();
137-
boolean explicitOtherStrategy = joinStrategies.size() > 0
138-
&& !joinStrategies.contains(PinotHintOptions.JoinHintOptions.DYNAMIC_BROADCAST_JOIN_STRATEGY);
131+
String joinStrategy = PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
132+
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
133+
if (joinStrategy != null && !joinStrategy.equals(
134+
PinotHintOptions.JoinHintOptions.DYNAMIC_BROADCAST_JOIN_STRATEGY)) {
135+
return false;
136+
}
139137

138+
// Do not apply this rule if it is not a SEMI join
140139
JoinInfo joinInfo = join.analyzeCondition();
141-
RelNode left = join.getLeft() instanceof HepRelVertex ? ((HepRelVertex) join.getLeft()).getCurrentRel()
142-
: join.getLeft();
143-
RelNode right = join.getRight() instanceof HepRelVertex ? ((HepRelVertex) join.getRight()).getCurrentRel()
144-
: join.getRight();
145-
return left instanceof Exchange && right instanceof Exchange
146-
// left side can be pushed as dynamic exchange
147-
&& PinotRuleUtils.canPushDynamicBroadcastToLeaf(left.getInput(0))
148-
// default enable dynamic broadcast for SEMI join unless other join strategy were specified
149-
&& !explicitOtherStrategy
150-
// condition for SEMI join
151-
&& join.getJoinType() == JoinRelType.SEMI && joinInfo.nonEquiConditions.isEmpty()
152-
&& joinInfo.leftKeys.size() == 1;
140+
if (join.getJoinType() != JoinRelType.SEMI || !joinInfo.nonEquiConditions.isEmpty()
141+
|| joinInfo.leftKeys.size() != 1) {
142+
return false;
143+
}
144+
145+
// Apply this rule if the left side can be pushed as dynamic exchange
146+
RelNode left = PinotRuleUtils.unboxRel(join.getLeft());
147+
RelNode right = PinotRuleUtils.unboxRel(join.getRight());
148+
return left instanceof Exchange && right instanceof Exchange && PinotRuleUtils.canPushDynamicBroadcastToLeaf(
149+
left.getInput(0));
153150
}
154151

155152
@Override
156153
public void onMatch(RelOptRuleCall call) {
157154
Join join = call.rel(0);
158-
PinotLogicalExchange left = (PinotLogicalExchange) (join.getLeft() instanceof HepRelVertex
159-
? ((HepRelVertex) join.getLeft()).getCurrentRel() : join.getLeft());
160-
PinotLogicalExchange right = (PinotLogicalExchange) (join.getRight() instanceof HepRelVertex
161-
? ((HepRelVertex) join.getRight()).getCurrentRel() : join.getRight());
155+
PinotLogicalExchange left = (PinotLogicalExchange) PinotRuleUtils.unboxRel(join.getLeft());
156+
PinotLogicalExchange right = (PinotLogicalExchange) PinotRuleUtils.unboxRel(join.getRight());
162157

163158
// when colocated join hint is given, dynamic broadcast exchange can be hash-distributed b/c
164159
// 1. currently, dynamic broadcast only works against main table off leaf-stage; (e.g. receive node on leaf)
165160
// 2. when hash key are the same but hash functions are different, it can be done via normal hash shuffle.
166-
boolean isColocatedJoin = PinotHintStrategyTable.isHintOptionTrue(join.getHints(),
167-
PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
168-
PinotLogicalExchange dynamicBroadcastExchange;
169-
RelNode rightInput = right.getInput();
170-
if (isColocatedJoin) {
171-
RelDistribution dist = RelDistributions.hash(join.analyzeCondition().rightKeys);
172-
dynamicBroadcastExchange = PinotLogicalExchange.create(rightInput, dist, PinotRelExchangeType.PIPELINE_BREAKER);
173-
} else {
174-
RelDistribution dist = RelDistributions.BROADCAST_DISTRIBUTED;
175-
dynamicBroadcastExchange = PinotLogicalExchange.create(rightInput, dist, PinotRelExchangeType.PIPELINE_BREAKER);
176-
}
161+
boolean isColocatedJoin =
162+
PinotHintStrategyTable.isHintOptionTrue(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
163+
PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
164+
RelDistribution relDistribution = isColocatedJoin ? RelDistributions.hash(join.analyzeCondition().rightKeys)
165+
: RelDistributions.BROADCAST_DISTRIBUTED;
166+
PinotLogicalExchange dynamicBroadcastExchange =
167+
PinotLogicalExchange.create(right.getInput(), relDistribution, PinotRelExchangeType.PIPELINE_BREAKER);
177168
Join dynamicFilterJoin =
178169
new LogicalJoin(join.getCluster(), join.getTraitSet(), left.getInput(), dynamicBroadcastExchange,
179170
join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(),

pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java

Lines changed: 43 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,12 @@
5353

5454

5555
/**
56-
* This basic {@code BroadcastJoinOperator} implement a basic broadcast join algorithm.
57-
* This algorithm assumes that the broadcast table has to fit in memory since we are not supporting any spilling.
58-
*
59-
* For left join, inner join, right join and full join,
60-
* <p>It takes the right table as the broadcast side and materialize a hash table. Then for each of the left table row,
61-
* it looks up for the corresponding row(s) from the hash table and create a joint row.
62-
*
63-
* <p>For each of the data block received from the left table, it will generate a joint data block.
64-
* We currently support left join, inner join, right join and full join.
65-
* The output is in the format of [left_row, right_row]
56+
* This {@code HashJoinOperator} implements the hash join algorithm.
57+
* <p>This algorithm assumes that the right table has to fit in memory since we are not supporting any spilling. It
58+
* reads the complete hash partitioned right table and materialize the data into a hash table. Then for each of the left
59+
* table row, it looks up for the corresponding row(s) from the hash table and create a joint row.
60+
* <p>For each of the data block received from the left table, it generates a joint data block. The output is in the
61+
* format of [left_row, right_row].
6662
*/
6763
// TODO: Move inequi out of hashjoin. (https://github.com/apache/pinot/issues/9728)
6864
// TODO: Support memory size based resource limit.
@@ -122,9 +118,9 @@ public class HashJoinOperator extends MultiStageOperator {
122118
public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator leftInput, DataSchema leftSchema,
123119
MultiStageOperator rightInput, JoinNode node) {
124120
super(context);
125-
Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(node.getJoinType()),
126-
"Join type: " + node.getJoinType() + " is not supported!");
127121
_joinType = node.getJoinType();
122+
Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(_joinType), "Join type: % is not supported for hash join",
123+
_joinType);
128124
_leftKeySelector = KeySelectorFactory.getKeySelector(node.getLeftKeys());
129125
_rightKeySelector = KeySelectorFactory.getKeySelector(node.getRightKeys());
130126
_leftColumnSize = leftSchema.size();
@@ -231,8 +227,8 @@ private void buildBroadcastHashTable()
231227
// Row based overflow check.
232228
if (container.size() + _currentRowsInHashTable > _maxRowsInJoin) {
233229
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
234-
throwProcessingExceptionForJoinRowLimitExceeded("Cannot build in memory hash table for join operator, "
235-
+ "reached number of rows limit: " + _maxRowsInJoin);
230+
throwProcessingExceptionForJoinRowLimitExceeded(
231+
"Cannot build in memory hash table for join operator, reached number of rows limit: " + _maxRowsInJoin);
236232
} else {
237233
// Just fill up the buffer.
238234
int remainingRows = _maxRowsInJoin - _currentRowsInHashTable;
@@ -319,25 +315,6 @@ private List<Object[]> buildJoinedRows(TransferableBlock leftBlock)
319315
}
320316
}
321317

322-
private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock)
323-
throws ProcessingException {
324-
List<Object[]> container = leftBlock.getContainer();
325-
List<Object[]> rows = new ArrayList<>(container.size());
326-
327-
for (Object[] leftRow : container) {
328-
Object key = _leftKeySelector.getKey(leftRow);
329-
// SEMI-JOIN only checks existence of the key
330-
if (_broadcastRightTable.containsKey(key)) {
331-
if (incrementJoinedRowsAndCheckLimit()) {
332-
break;
333-
}
334-
rows.add(joinRow(leftRow, null));
335-
}
336-
}
337-
338-
return rows;
339-
}
340-
341318
private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock)
342319
throws ProcessingException {
343320
List<Object[]> container = leftBlock.getContainer();
@@ -389,6 +366,25 @@ private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock)
389366
return rows;
390367
}
391368

369+
private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock)
370+
throws ProcessingException {
371+
List<Object[]> container = leftBlock.getContainer();
372+
List<Object[]> rows = new ArrayList<>(container.size());
373+
374+
for (Object[] leftRow : container) {
375+
Object key = _leftKeySelector.getKey(leftRow);
376+
// SEMI-JOIN only checks existence of the key
377+
if (_broadcastRightTable.containsKey(key)) {
378+
if (incrementJoinedRowsAndCheckLimit()) {
379+
break;
380+
}
381+
rows.add(leftRow);
382+
}
383+
}
384+
385+
return rows;
386+
}
387+
392388
private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock)
393389
throws ProcessingException {
394390
List<Object[]> container = leftBlock.getContainer();
@@ -401,7 +397,7 @@ private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock)
401397
if (incrementJoinedRowsAndCheckLimit()) {
402398
break;
403399
}
404-
rows.add(joinRow(leftRow, null));
400+
rows.add(leftRow);
405401
}
406402
}
407403

@@ -430,18 +426,11 @@ private List<Object[]> buildNonMatchRightRows() {
430426

431427
private Object[] joinRow(@Nullable Object[] leftRow, @Nullable Object[] rightRow) {
432428
Object[] resultRow = new Object[_resultColumnSize];
433-
int idx = 0;
434429
if (leftRow != null) {
435-
for (Object obj : leftRow) {
436-
resultRow[idx++] = obj;
437-
}
430+
System.arraycopy(leftRow, 0, resultRow, 0, leftRow.length);
438431
}
439-
// This is needed since left row can be null and we need to advance the idx to the beginning of right row.
440-
idx = _leftColumnSize;
441432
if (rightRow != null) {
442-
for (Object obj : rightRow) {
443-
resultRow[idx++] = obj;
444-
}
433+
System.arraycopy(rightRow, 0, resultRow, _leftColumnSize, rightRow.length);
445434
}
446435
return resultRow;
447436
}
@@ -485,8 +474,8 @@ private boolean incrementJoinedRowsAndCheckLimit()
485474
_currentJoinedRows++;
486475
if (_currentJoinedRows > _maxRowsInJoin) {
487476
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
488-
throwProcessingExceptionForJoinRowLimitExceeded("Cannot process join, reached number of rows limit: "
489-
+ _maxRowsInJoin);
477+
throwProcessingExceptionForJoinRowLimitExceeded(
478+
"Cannot process join, reached number of rows limit: " + _maxRowsInJoin);
490479
} else {
491480
// Skip over remaining blocks until we reach the end of stream since we already breached the rows limit.
492481
logger().info("Terminating join operator early as the maximum number of rows limit was reached: {}",
@@ -504,15 +493,15 @@ private void throwProcessingExceptionForJoinRowLimitExceeded(String reason)
504493
throws ProcessingException {
505494
ProcessingException resourceLimitExceededException =
506495
new ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
507-
resourceLimitExceededException.setMessage(
508-
reason + ". Consider increasing the limit for the maximum number of rows in a join either via the query "
509-
+ "option '" + CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN + "' or the '"
510-
+ PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN + "' hint in the '"
511-
+ PinotHintOptions.JOIN_HINT_OPTIONS + "'. Alternatively, if partial results are acceptable, the join"
512-
+ " overflow mode can be set to '" + JoinOverFlowMode.BREAK.name() + "' either via the query option '"
513-
+ CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE + "' or the '"
514-
+ PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE + "' hint in the '"
515-
+ PinotHintOptions.JOIN_HINT_OPTIONS + "'.");
496+
resourceLimitExceededException.setMessage(reason
497+
+ ". Consider increasing the limit for the maximum number of rows in a join either via the query option '"
498+
+ CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN + "' or the '"
499+
+ PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN + "' hint in the '" + PinotHintOptions.JOIN_HINT_OPTIONS
500+
+ "'. Alternatively, if partial results are acceptable, the join overflow mode can be set to '"
501+
+ JoinOverFlowMode.BREAK.name() + "' either via the query option '"
502+
+ CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE + "' or the '"
503+
+ PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE + "' hint in the '" + PinotHintOptions.JOIN_HINT_OPTIONS
504+
+ "'.");
516505
throw resourceLimitExceededException;
517506
}
518507

0 commit comments

Comments
 (0)