Skip to content

Commit

Permalink
[Multi-stage] Support lookup join
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Sep 12, 2024
1 parent cd5fb1a commit dfc80c2
Show file tree
Hide file tree
Showing 33 changed files with 3,349 additions and 173 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ cscope.*
.externalToolBuilders/
maven-eclipse.xml
target/
examples/
/examples/
bin/
*/bin/
.idea
Expand Down
6 changes: 6 additions & 0 deletions pinot-common/src/main/proto/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,17 @@ enum JoinType {
ANTI = 5;
}

enum JoinStrategy {
HASH = 0;
LOOKUP = 1;
}

message JoinNode {
JoinType joinType = 1;
repeated int32 leftKeys = 2;
repeated int32 rightKeys = 3;
repeated Expression nonEquiConditions = 4;
JoinStrategy joinStrategy = 5;
}

enum ExchangeType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,27 @@ public static class WindowHintOptions {

public static class JoinHintOptions {
public static final String JOIN_STRATEGY = "join_strategy";
// "hash" is the default strategy for non-SEMI joins
public static final String HASH_JOIN_STRATEGY = "hash";
// "dynamic_broadcast" is the default strategy for SEMI joins
public static final String DYNAMIC_BROADCAST_JOIN_STRATEGY = "dynamic_broadcast";
public static final String HASH_TABLE_JOIN_STRATEGY = "hash_table";
// "lookup" can be used when the right table is a dimension table replicated to all workers
public static final String LOOKUP_JOIN_STRATEGY = "lookup";

/**
* Max rows allowed to build the right table hash collection.
*/
public static final String MAX_ROWS_IN_JOIN = "max_rows_in_join";

/**
* Mode when join overflow happens, supported values: THROW or BREAK.
* THROW(default): Break right table build process, and throw exception, no JOIN with left table performed.
* BREAK: Break right table build process, continue to perform JOIN operation, results might be partial.
*/
public static final String JOIN_OVERFLOW_MODE = "join_overflow_mode";

/**
* Indicat that the join operator(s) within a certain selection scope are colocated
* Indicates that the join operator(s) within a certain selection scope are colocated
*/
public static final String IS_COLOCATED_BY_JOIN_KEYS = "is_colocated_by_join_keys";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;


Expand Down Expand Up @@ -56,27 +58,33 @@ public boolean matches(RelOptRuleCall call) {
@Override
public void onMatch(RelOptRuleCall call) {
Join join = call.rel(0);
RelNode leftInput = join.getInput(0);
RelNode rightInput = join.getInput(1);

RelNode leftExchange;
RelNode rightExchange;
RelNode left = PinotRuleUtils.unboxRel(join.getInput(0));
RelNode right = PinotRuleUtils.unboxRel(join.getInput(1));
JoinInfo joinInfo = join.analyzeCondition();

if (joinInfo.leftKeys.isEmpty()) {
// when there's no JOIN key, use broadcast.
leftExchange = PinotLogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
rightExchange = PinotLogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);
String joinStrategy = PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
RelNode newLeft;
RelNode newRight;
if (PinotHintOptions.JoinHintOptions.LOOKUP_JOIN_STRATEGY.equals(joinStrategy)) {
// Lookup join - add local exchange on the left side
newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON);
newRight = right;
} else {
// when join key exists, use hash distribution.
leftExchange = PinotLogicalExchange.create(leftInput, RelDistributions.hash(joinInfo.leftKeys));
rightExchange = PinotLogicalExchange.create(rightInput, RelDistributions.hash(joinInfo.rightKeys));
// Regular join - add exchange on both sides
if (joinInfo.leftKeys.isEmpty()) {
// Broadcast the right side if there is no join key
newLeft = PinotLogicalExchange.create(left, RelDistributions.RANDOM_DISTRIBUTED);
newRight = PinotLogicalExchange.create(right, RelDistributions.BROADCAST_DISTRIBUTED);
} else {
// Use hash exchange when there are join keys
newLeft = PinotLogicalExchange.create(left, RelDistributions.hash(joinInfo.leftKeys));
newRight = PinotLogicalExchange.create(right, RelDistributions.hash(joinInfo.rightKeys));
}
}

RelNode newJoinNode =
new LogicalJoin(join.getCluster(), join.getTraitSet(), join.getHints(), leftExchange, rightExchange,
join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(),
ImmutableList.copyOf(join.getSystemFieldList()));
call.transformTo(newJoinNode);
// TODO: Consider creating different JOIN Rel for each join strategy
call.transformTo(
new LogicalJoin(join.getCluster(), join.getTraitSet(), join.getHints(), newLeft, newRight, join.getCondition(),
join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(),
ImmutableList.copyOf(join.getSystemFieldList())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@
package org.apache.pinot.calcite.rel.rules;

import com.google.common.collect.ImmutableList;
import java.util.Collections;
import java.util.List;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.hep.HepRelVertex;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelNode;
Expand All @@ -37,7 +34,6 @@
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
import org.apache.zookeeper.common.StringUtils;


/**
Expand Down Expand Up @@ -129,55 +125,49 @@ public boolean matches(RelOptRuleCall call) {
if (call.rels.length < 1 || !(call.rel(0) instanceof Join)) {
return false;
}

// Do not apply this rule if join strategy is explicitly set to something other than dynamic broadcast
Join join = call.rel(0);
String joinStrategyString = PinotHintStrategyTable.getHintOption(join.getHints(),
PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
List<String> joinStrategies = joinStrategyString != null ? StringUtils.split(joinStrategyString, ",")
: Collections.emptyList();
boolean explicitOtherStrategy = joinStrategies.size() > 0
&& !joinStrategies.contains(PinotHintOptions.JoinHintOptions.DYNAMIC_BROADCAST_JOIN_STRATEGY);
String joinStrategy = PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
if (joinStrategy != null && !joinStrategy.equals(
PinotHintOptions.JoinHintOptions.DYNAMIC_BROADCAST_JOIN_STRATEGY)) {
return false;
}

// Do not apply this rule if it is not a SEMI join
JoinInfo joinInfo = join.analyzeCondition();
RelNode left = join.getLeft() instanceof HepRelVertex ? ((HepRelVertex) join.getLeft()).getCurrentRel()
: join.getLeft();
RelNode right = join.getRight() instanceof HepRelVertex ? ((HepRelVertex) join.getRight()).getCurrentRel()
: join.getRight();
return left instanceof Exchange && right instanceof Exchange
// left side can be pushed as dynamic exchange
&& PinotRuleUtils.canPushDynamicBroadcastToLeaf(left.getInput(0))
// default enable dynamic broadcast for SEMI join unless other join strategy were specified
&& !explicitOtherStrategy
// condition for SEMI join
&& join.getJoinType() == JoinRelType.SEMI && joinInfo.nonEquiConditions.isEmpty()
&& joinInfo.leftKeys.size() == 1;
if (join.getJoinType() != JoinRelType.SEMI || !joinInfo.nonEquiConditions.isEmpty()
|| joinInfo.leftKeys.size() != 1) {
return false;
}

// Apply this rule if the left side can be pushed as dynamic exchange
RelNode left = PinotRuleUtils.unboxRel(join.getLeft());
RelNode right = PinotRuleUtils.unboxRel(join.getRight());
return left instanceof Exchange && right instanceof Exchange && PinotRuleUtils.canPushDynamicBroadcastToLeaf(
left.getInput(0));
}

@Override
public void onMatch(RelOptRuleCall call) {
Join join = call.rel(0);
PinotLogicalExchange left = (PinotLogicalExchange) (join.getLeft() instanceof HepRelVertex
? ((HepRelVertex) join.getLeft()).getCurrentRel() : join.getLeft());
PinotLogicalExchange right = (PinotLogicalExchange) (join.getRight() instanceof HepRelVertex
? ((HepRelVertex) join.getRight()).getCurrentRel() : join.getRight());
PinotLogicalExchange left = (PinotLogicalExchange) PinotRuleUtils.unboxRel(join.getLeft());
PinotLogicalExchange right = (PinotLogicalExchange) PinotRuleUtils.unboxRel(join.getRight());

// when colocated join hint is given, dynamic broadcast exchange can be hash-distributed b/c
// 1. currently, dynamic broadcast only works against main table off leaf-stage; (e.g. receive node on leaf)
// 2. when hash key are the same but hash functions are different, it can be done via normal hash shuffle.
boolean isColocatedJoin = PinotHintStrategyTable.isHintOptionTrue(join.getHints(),
PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
PinotLogicalExchange dynamicBroadcastExchange;
RelNode rightInput = right.getInput();
if (isColocatedJoin) {
RelDistribution dist = RelDistributions.hash(join.analyzeCondition().rightKeys);
dynamicBroadcastExchange = PinotLogicalExchange.create(rightInput, dist, PinotRelExchangeType.PIPELINE_BREAKER);
} else {
RelDistribution dist = RelDistributions.BROADCAST_DISTRIBUTED;
dynamicBroadcastExchange = PinotLogicalExchange.create(rightInput, dist, PinotRelExchangeType.PIPELINE_BREAKER);
}
Join dynamicFilterJoin =
new LogicalJoin(join.getCluster(), join.getTraitSet(), left.getInput(), dynamicBroadcastExchange,
join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(),
ImmutableList.copyOf(join.getSystemFieldList()));
call.transformTo(dynamicFilterJoin);
boolean isColocatedJoin =
PinotHintStrategyTable.isHintOptionTrue(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
RelDistribution relDistribution = isColocatedJoin ? RelDistributions.hash(join.analyzeCondition().rightKeys)
: RelDistributions.BROADCAST_DISTRIBUTED;
PinotLogicalExchange dynamicBroadcastExchange =
PinotLogicalExchange.create(right.getInput(), relDistribution, PinotRelExchangeType.PIPELINE_BREAKER);
// TODO: Consider creating different JOIN Rel for each join strategy
call.transformTo(new LogicalJoin(join.getCluster(), join.getTraitSet(), join.getHints(), left.getInput(),
dynamicBroadcastExchange, join.getCondition(), join.getVariablesSet(), join.getJoinType(),
join.isSemiJoinDone(), ImmutableList.copyOf(join.getSystemFieldList())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.query.planner.logical;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand All @@ -30,8 +31,12 @@
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Exchange;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
Expand All @@ -42,13 +47,18 @@
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.pinot.calcite.rel.logical.PinotLogicalAggregate;
import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
import org.apache.pinot.calcite.rel.logical.PinotLogicalSortExchange;
import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.calcite.rel.rules.PinotRuleUtils;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.DataSchema;
Expand Down Expand Up @@ -255,11 +265,62 @@ private TableScanNode convertLogicalTableScan(LogicalTableScan node) {
convertInputs(node.getInputs()), tableName, columns);
}

private JoinNode convertLogicalJoin(LogicalJoin node) {
JoinInfo joinInfo = node.analyzeCondition();
return new JoinNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), NodeHint.fromRelHints(node.getHints()),
convertInputs(node.getInputs()), node.getJoinType(), joinInfo.leftKeys, joinInfo.rightKeys,
RexExpressionUtils.fromRexNodes(joinInfo.nonEquiConditions));
private JoinNode convertLogicalJoin(LogicalJoin join) {
JoinInfo joinInfo = join.analyzeCondition();
DataSchema dataSchema = toDataSchema(join.getRowType());
List<PlanNode> inputs = convertInputs(join.getInputs());
JoinRelType joinType = join.getJoinType();

// Run some validations for join
Preconditions.checkState(inputs.size() == 2, "Join should have exactly 2 inputs, got: %s", inputs.size());
PlanNode left = inputs.get(0);
PlanNode right = inputs.get(1);
int numLeftColumns = left.getDataSchema().size();
int numResultColumns = dataSchema.size();
if (joinType.projectsRight()) {
int numRightColumns = right.getDataSchema().size();
Preconditions.checkState(numLeftColumns + numRightColumns == numResultColumns,
"Invalid number of columns for join type: %s, left: %s, right: %s, result: %s", joinType, numLeftColumns,
numRightColumns, numResultColumns);
} else {
Preconditions.checkState(numLeftColumns == numResultColumns,
"Invalid number of columns for join type: %s, left: %s, result: %s", joinType, numLeftColumns,
numResultColumns);
}

// Check if the join hint specifies the join strategy
JoinNode.JoinStrategy joinStrategy;
ImmutableList<RelHint> relHints = join.getHints();
String joinStrategyHint = PinotHintStrategyTable.getHintOption(relHints, PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
if (PinotHintOptions.JoinHintOptions.LOOKUP_JOIN_STRATEGY.equals(joinStrategyHint)) {
joinStrategy = JoinNode.JoinStrategy.LOOKUP;

// Run some validations for lookup join
Preconditions.checkArgument(!joinInfo.leftKeys.isEmpty(), "Lookup join requires join keys");
// Right table should be a dimension table, and the right input should be an identifier only ProjectNode over
// TableScanNode.
RelNode rightInput = PinotRuleUtils.unboxRel(join.getRight());
Preconditions.checkState(rightInput instanceof Project, "Right input for lookup join must be a Project, got: %s",
rightInput.getClass().getSimpleName());
Project project = (Project) rightInput;
for (RexNode node : project.getProjects()) {
Preconditions.checkState(node instanceof RexInputRef,
"Right input for lookup join must be an identifier (RexInputRef) only Project, got: %s in project",
node.getClass().getSimpleName());
}
RelNode projectInput = PinotRuleUtils.unboxRel(project.getInput());
Preconditions.checkState(projectInput instanceof TableScan,
"Right input for lookup join must be a Project over TableScan, got Project over: %s",
projectInput.getClass().getSimpleName());
} else {
// TODO: Consider adding DYNAMIC_BROADCAST as a separate join strategy
joinStrategy = JoinNode.JoinStrategy.HASH;
}

return new JoinNode(DEFAULT_STAGE_ID, dataSchema, NodeHint.fromRelHints(relHints), inputs, joinType,
joinInfo.leftKeys, joinInfo.rightKeys, RexExpressionUtils.fromRexNodes(joinInfo.nonEquiConditions),
joinStrategy);
}

private List<PlanNode> convertInputs(List<RelNode> inputs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@ public class JoinNode extends BasePlanNode {
private final List<Integer> _leftKeys;
private final List<Integer> _rightKeys;
private final List<RexExpression> _nonEquiConditions;
private final JoinStrategy _joinStrategy;

public JoinNode(int stageId, DataSchema dataSchema, NodeHint nodeHint, List<PlanNode> inputs, JoinRelType joinType,
List<Integer> leftKeys, List<Integer> rightKeys, List<RexExpression> nonEquiConditions) {
List<Integer> leftKeys, List<Integer> rightKeys, List<RexExpression> nonEquiConditions,
JoinStrategy joinStrategy) {
super(stageId, dataSchema, nodeHint, inputs);
_joinType = joinType;
_leftKeys = leftKeys;
_rightKeys = rightKeys;
_nonEquiConditions = nonEquiConditions;
_joinStrategy = joinStrategy;
}

public JoinRelType getJoinType() {
Expand All @@ -56,6 +59,10 @@ public List<RexExpression> getNonEquiConditions() {
return _nonEquiConditions;
}

public JoinStrategy getJoinStrategy() {
return _joinStrategy;
}

@Override
public String explain() {
return "JOIN";
Expand All @@ -79,11 +86,16 @@ public boolean equals(Object o) {
}
JoinNode joinNode = (JoinNode) o;
return _joinType == joinNode._joinType && Objects.equals(_leftKeys, joinNode._leftKeys) && Objects.equals(
_rightKeys, joinNode._rightKeys) && Objects.equals(_nonEquiConditions, joinNode._nonEquiConditions);
_rightKeys, joinNode._rightKeys) && Objects.equals(_nonEquiConditions, joinNode._nonEquiConditions)
&& _joinStrategy == joinNode._joinStrategy;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), _joinType, _leftKeys, _rightKeys, _nonEquiConditions);
return Objects.hash(super.hashCode(), _joinType, _leftKeys, _rightKeys, _nonEquiConditions, _joinStrategy);
}

public enum JoinStrategy {
HASH, LOOKUP
}
}
Loading

0 comments on commit dfc80c2

Please sign in to comment.