Skip to content

Commit

Permalink
[Multi-stage] Support lookup join
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Sep 9, 2024
1 parent cd315cc commit 31df207
Show file tree
Hide file tree
Showing 11 changed files with 452 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,27 @@ public static class WindowHintOptions {

public static class JoinHintOptions {
public static final String JOIN_STRATEGY = "join_strategy";
public static final String DYNAMIC_BROADCAST_JOIN_STRATEGY = "dynamic_broadcast";
// "hash_table" is the default strategy for non-SEMI joins
public static final String HASH_TABLE_JOIN_STRATEGY = "hash_table";
// "dynamic_broadcast" is the default strategy for SEMI joins
public static final String DYNAMIC_BROADCAST_JOIN_STRATEGY = "dynamic_broadcast";
// "lookup" can be used when the right table is a dimension table replicated to all workers
public static final String LOOKUP_JOIN_STRATEGY = "lookup";

/**
* Max rows allowed to build the right table hash collection.
*/
public static final String MAX_ROWS_IN_JOIN = "max_rows_in_join";

/**
* Mode when join overflow happens, supported values: THROW or BREAK.
* THROW(default): Break right table build process, and throw exception, no JOIN with left table performed.
* BREAK: Break right table build process, continue to perform JOIN operation, results might be partial.
*/
public static final String JOIN_OVERFLOW_MODE = "join_overflow_mode";

/**
* Indicat that the join operator(s) within a certain selection scope are colocated
* Indicates that the join operator(s) within a certain selection scope are colocated
*/
public static final String IS_COLOCATED_BY_JOIN_KEYS = "is_colocated_by_join_keys";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,22 @@
*/
package org.apache.pinot.calcite.rel.rules;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;


Expand Down Expand Up @@ -56,27 +63,49 @@ public boolean matches(RelOptRuleCall call) {
@Override
public void onMatch(RelOptRuleCall call) {
Join join = call.rel(0);
RelNode leftInput = join.getInput(0);
RelNode rightInput = join.getInput(1);

RelNode leftExchange;
RelNode rightExchange;
RelNode leftInput = PinotRuleUtils.unboxRel(join.getInput(0));
RelNode rightInput = PinotRuleUtils.unboxRel(join.getInput(1));
JoinInfo joinInfo = join.analyzeCondition();
String joinStrategy = PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);

if (joinInfo.leftKeys.isEmpty()) {
// when there's no JOIN key, use broadcast.
leftExchange = PinotLogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
rightExchange = PinotLogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);
RelNode newLeftInput;
RelNode newRightInput;
if (PinotHintOptions.JoinHintOptions.LOOKUP_JOIN_STRATEGY.equals(joinStrategy)) {
// Lookup join
Preconditions.checkArgument(!joinInfo.leftKeys.isEmpty(), "Lookup join requires join keys");
newLeftInput = PinotLogicalExchange.create(leftInput, RelDistributions.hash(joinInfo.leftKeys));
// Right table should be a dimension table, and the right input should be an identifier only ProjectNode over
// TableScanNode.
Preconditions.checkState(rightInput instanceof Project, "Right input for lookup join must be a Project, got: %s",
rightInput.getClass().getSimpleName());
Project project = (Project) rightInput;
for (RexNode node : project.getProjects()) {
Preconditions.checkState(node instanceof RexInputRef,
"Right input for lookup join must be an identifier (RexInputRef) only Project, got: %s in project",
node.getClass().getSimpleName());
}
RelNode projectInput = PinotRuleUtils.unboxRel(project.getInput());
Preconditions.checkState(projectInput instanceof TableScan,
"Right input for lookup join must be a Project over TableScan, got Project over: %s",
projectInput.getClass().getSimpleName());
newRightInput = rightInput;
} else {
// when join key exists, use hash distribution.
leftExchange = PinotLogicalExchange.create(leftInput, RelDistributions.hash(joinInfo.leftKeys));
rightExchange = PinotLogicalExchange.create(rightInput, RelDistributions.hash(joinInfo.rightKeys));
// Regular join
if (joinInfo.leftKeys.isEmpty()) {
// Broadcast the right table if there is no join key
newLeftInput = PinotLogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
newRightInput = PinotLogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);
} else {
// Use hash join when there are join keys
newLeftInput = PinotLogicalExchange.create(leftInput, RelDistributions.hash(joinInfo.leftKeys));
newRightInput = PinotLogicalExchange.create(rightInput, RelDistributions.hash(joinInfo.rightKeys));
}
}

RelNode newJoinNode =
new LogicalJoin(join.getCluster(), join.getTraitSet(), join.getHints(), leftExchange, rightExchange,
call.transformTo(
new LogicalJoin(join.getCluster(), join.getTraitSet(), join.getHints(), newLeftInput, newRightInput,
join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(),
ImmutableList.copyOf(join.getSystemFieldList()));
call.transformTo(newJoinNode);
ImmutableList.copyOf(join.getSystemFieldList())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@
package org.apache.pinot.calcite.rel.rules;

import com.google.common.collect.ImmutableList;
import java.util.Collections;
import java.util.List;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.hep.HepRelVertex;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelNode;
Expand All @@ -37,7 +34,6 @@
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
import org.apache.zookeeper.common.StringUtils;


/**
Expand Down Expand Up @@ -129,51 +125,46 @@ public boolean matches(RelOptRuleCall call) {
if (call.rels.length < 1 || !(call.rel(0) instanceof Join)) {
return false;
}

// Do not apply this rule if join strategy is explicitly set to something other than dynamic broadcast
Join join = call.rel(0);
String joinStrategyString = PinotHintStrategyTable.getHintOption(join.getHints(),
PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
List<String> joinStrategies = joinStrategyString != null ? StringUtils.split(joinStrategyString, ",")
: Collections.emptyList();
boolean explicitOtherStrategy = joinStrategies.size() > 0
&& !joinStrategies.contains(PinotHintOptions.JoinHintOptions.DYNAMIC_BROADCAST_JOIN_STRATEGY);
String joinStrategy = PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
if (joinStrategy != null && !joinStrategy.equals(
PinotHintOptions.JoinHintOptions.DYNAMIC_BROADCAST_JOIN_STRATEGY)) {
return false;
}

// Do not apply this rule if it is not a SEMI join
JoinInfo joinInfo = join.analyzeCondition();
RelNode left = join.getLeft() instanceof HepRelVertex ? ((HepRelVertex) join.getLeft()).getCurrentRel()
: join.getLeft();
RelNode right = join.getRight() instanceof HepRelVertex ? ((HepRelVertex) join.getRight()).getCurrentRel()
: join.getRight();
return left instanceof Exchange && right instanceof Exchange
// left side can be pushed as dynamic exchange
&& PinotRuleUtils.canPushDynamicBroadcastToLeaf(left.getInput(0))
// default enable dynamic broadcast for SEMI join unless other join strategy were specified
&& !explicitOtherStrategy
// condition for SEMI join
&& join.getJoinType() == JoinRelType.SEMI && joinInfo.nonEquiConditions.isEmpty()
&& joinInfo.leftKeys.size() == 1;
if (join.getJoinType() != JoinRelType.SEMI || !joinInfo.nonEquiConditions.isEmpty()
|| joinInfo.leftKeys.size() != 1) {
return false;
}

// Apply this rule if the left side can be pushed as dynamic exchange
RelNode left = PinotRuleUtils.unboxRel(join.getLeft());
RelNode right = PinotRuleUtils.unboxRel(join.getRight());
return left instanceof Exchange && right instanceof Exchange && PinotRuleUtils.canPushDynamicBroadcastToLeaf(
left.getInput(0));
}

@Override
public void onMatch(RelOptRuleCall call) {
Join join = call.rel(0);
PinotLogicalExchange left = (PinotLogicalExchange) (join.getLeft() instanceof HepRelVertex
? ((HepRelVertex) join.getLeft()).getCurrentRel() : join.getLeft());
PinotLogicalExchange right = (PinotLogicalExchange) (join.getRight() instanceof HepRelVertex
? ((HepRelVertex) join.getRight()).getCurrentRel() : join.getRight());
PinotLogicalExchange left = (PinotLogicalExchange) PinotRuleUtils.unboxRel(join.getLeft());
PinotLogicalExchange right = (PinotLogicalExchange) PinotRuleUtils.unboxRel(join.getRight());

// when colocated join hint is given, dynamic broadcast exchange can be hash-distributed b/c
// 1. currently, dynamic broadcast only works against main table off leaf-stage; (e.g. receive node on leaf)
// 2. when hash key are the same but hash functions are different, it can be done via normal hash shuffle.
boolean isColocatedJoin = PinotHintStrategyTable.isHintOptionTrue(join.getHints(),
PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
PinotLogicalExchange dynamicBroadcastExchange;
RelNode rightInput = right.getInput();
if (isColocatedJoin) {
RelDistribution dist = RelDistributions.hash(join.analyzeCondition().rightKeys);
dynamicBroadcastExchange = PinotLogicalExchange.create(rightInput, dist, PinotRelExchangeType.PIPELINE_BREAKER);
} else {
RelDistribution dist = RelDistributions.BROADCAST_DISTRIBUTED;
dynamicBroadcastExchange = PinotLogicalExchange.create(rightInput, dist, PinotRelExchangeType.PIPELINE_BREAKER);
}
boolean isColocatedJoin =
PinotHintStrategyTable.isHintOptionTrue(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
RelDistribution relDistribution = isColocatedJoin ? RelDistributions.hash(join.analyzeCondition().rightKeys)
: RelDistributions.BROADCAST_DISTRIBUTED;
PinotLogicalExchange dynamicBroadcastExchange =
PinotLogicalExchange.create(right.getInput(), relDistribution, PinotRelExchangeType.PIPELINE_BREAKER);
Join dynamicFilterJoin =
new LogicalJoin(join.getCluster(), join.getTraitSet(), left.getInput(), dynamicBroadcastExchange,
join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,12 @@


/**
* This basic {@code BroadcastJoinOperator} implement a basic broadcast join algorithm.
* This algorithm assumes that the broadcast table has to fit in memory since we are not supporting any spilling.
*
* For left join, inner join, right join and full join,
* <p>It takes the right table as the broadcast side and materialize a hash table. Then for each of the left table row,
* it looks up for the corresponding row(s) from the hash table and create a joint row.
*
* <p>For each of the data block received from the left table, it will generate a joint data block.
* We currently support left join, inner join, right join and full join.
* The output is in the format of [left_row, right_row]
* This {@code HashJoinOperator} implements the hash join algorithm.
* <p>This algorithm assumes that the right table has to fit in memory since we are not supporting any spilling. It
* reads the complete hash partitioned right table and materialize the data into a hash table. Then for each of the left
* table row, it looks up for the corresponding row(s) from the hash table and create a joint row.
* <p>For each of the data block received from the left table, it generates a joint data block. The output is in the
* format of [left_row, right_row].
*/
// TODO: Move inequi out of hashjoin. (https://github.com/apache/pinot/issues/9728)
// TODO: Support memory size based resource limit.
Expand Down Expand Up @@ -122,9 +118,9 @@ public class HashJoinOperator extends MultiStageOperator {
public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator leftInput, DataSchema leftSchema,
MultiStageOperator rightInput, JoinNode node) {
super(context);
Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(node.getJoinType()),
"Join type: " + node.getJoinType() + " is not supported!");
_joinType = node.getJoinType();
Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(_joinType), "Join type: % is not supported for hash join",
_joinType);
_leftKeySelector = KeySelectorFactory.getKeySelector(node.getLeftKeys());
_rightKeySelector = KeySelectorFactory.getKeySelector(node.getRightKeys());
_leftColumnSize = leftSchema.size();
Expand Down Expand Up @@ -231,8 +227,8 @@ private void buildBroadcastHashTable()
// Row based overflow check.
if (container.size() + _currentRowsInHashTable > _maxRowsInJoin) {
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
throwProcessingExceptionForJoinRowLimitExceeded("Cannot build in memory hash table for join operator, "
+ "reached number of rows limit: " + _maxRowsInJoin);
throwProcessingExceptionForJoinRowLimitExceeded(
"Cannot build in memory hash table for join operator, reached number of rows limit: " + _maxRowsInJoin);
} else {
// Just fill up the buffer.
int remainingRows = _maxRowsInJoin - _currentRowsInHashTable;
Expand Down Expand Up @@ -319,25 +315,6 @@ private List<Object[]> buildJoinedRows(TransferableBlock leftBlock)
}
}

private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock)
throws ProcessingException {
List<Object[]> container = leftBlock.getContainer();
List<Object[]> rows = new ArrayList<>(container.size());

for (Object[] leftRow : container) {
Object key = _leftKeySelector.getKey(leftRow);
// SEMI-JOIN only checks existence of the key
if (_broadcastRightTable.containsKey(key)) {
if (incrementJoinedRowsAndCheckLimit()) {
break;
}
rows.add(joinRow(leftRow, null));
}
}

return rows;
}

private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock)
throws ProcessingException {
List<Object[]> container = leftBlock.getContainer();
Expand Down Expand Up @@ -389,6 +366,25 @@ private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock)
return rows;
}

private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock)
throws ProcessingException {
List<Object[]> container = leftBlock.getContainer();
List<Object[]> rows = new ArrayList<>(container.size());

for (Object[] leftRow : container) {
Object key = _leftKeySelector.getKey(leftRow);
// SEMI-JOIN only checks existence of the key
if (_broadcastRightTable.containsKey(key)) {
if (incrementJoinedRowsAndCheckLimit()) {
break;
}
rows.add(leftRow);
}
}

return rows;
}

private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock)
throws ProcessingException {
List<Object[]> container = leftBlock.getContainer();
Expand All @@ -401,7 +397,7 @@ private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock)
if (incrementJoinedRowsAndCheckLimit()) {
break;
}
rows.add(joinRow(leftRow, null));
rows.add(leftRow);
}
}

Expand Down Expand Up @@ -430,18 +426,11 @@ private List<Object[]> buildNonMatchRightRows() {

private Object[] joinRow(@Nullable Object[] leftRow, @Nullable Object[] rightRow) {
Object[] resultRow = new Object[_resultColumnSize];
int idx = 0;
if (leftRow != null) {
for (Object obj : leftRow) {
resultRow[idx++] = obj;
}
System.arraycopy(leftRow, 0, resultRow, 0, leftRow.length);
}
// This is needed since left row can be null and we need to advance the idx to the beginning of right row.
idx = _leftColumnSize;
if (rightRow != null) {
for (Object obj : rightRow) {
resultRow[idx++] = obj;
}
System.arraycopy(rightRow, 0, resultRow, _leftColumnSize, rightRow.length);
}
return resultRow;
}
Expand Down Expand Up @@ -485,8 +474,8 @@ private boolean incrementJoinedRowsAndCheckLimit()
_currentJoinedRows++;
if (_currentJoinedRows > _maxRowsInJoin) {
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
throwProcessingExceptionForJoinRowLimitExceeded("Cannot process join, reached number of rows limit: "
+ _maxRowsInJoin);
throwProcessingExceptionForJoinRowLimitExceeded(
"Cannot process join, reached number of rows limit: " + _maxRowsInJoin);
} else {
// Skip over remaining blocks until we reach the end of stream since we already breached the rows limit.
logger().info("Terminating join operator early as the maximum number of rows limit was reached: {}",
Expand All @@ -504,15 +493,15 @@ private void throwProcessingExceptionForJoinRowLimitExceeded(String reason)
throws ProcessingException {
ProcessingException resourceLimitExceededException =
new ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
resourceLimitExceededException.setMessage(
reason + ". Consider increasing the limit for the maximum number of rows in a join either via the query "
+ "option '" + CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN + "' or the '"
+ PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN + "' hint in the '"
+ PinotHintOptions.JOIN_HINT_OPTIONS + "'. Alternatively, if partial results are acceptable, the join"
+ " overflow mode can be set to '" + JoinOverFlowMode.BREAK.name() + "' either via the query option '"
+ CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE + "' or the '"
+ PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE + "' hint in the '"
+ PinotHintOptions.JOIN_HINT_OPTIONS + "'.");
resourceLimitExceededException.setMessage(reason
+ ". Consider increasing the limit for the maximum number of rows in a join either via the query option '"
+ CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN + "' or the '"
+ PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN + "' hint in the '" + PinotHintOptions.JOIN_HINT_OPTIONS
+ "'. Alternatively, if partial results are acceptable, the join overflow mode can be set to '"
+ JoinOverFlowMode.BREAK.name() + "' either via the query option '"
+ CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE + "' or the '"
+ PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE + "' hint in the '" + PinotHintOptions.JOIN_HINT_OPTIONS
+ "'.");
throw resourceLimitExceededException;
}

Expand Down
Loading

0 comments on commit 31df207

Please sign in to comment.