Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ cscope.*
.externalToolBuilders/
maven-eclipse.xml
target/
examples/
/examples/
/logs/
bin/
*/bin/
.idea
Expand Down
6 changes: 6 additions & 0 deletions pinot-common/src/main/proto/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,17 @@ enum JoinType {
ANTI = 5;
}

enum JoinStrategy {
HASH = 0;
LOOKUP = 1;
}

message JoinNode {
JoinType joinType = 1;
repeated int32 leftKeys = 2;
repeated int32 rightKeys = 3;
repeated Expression nonEquiConditions = 4;
JoinStrategy joinStrategy = 5;
}

enum ExchangeType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,27 @@ public static class WindowHintOptions {

public static class JoinHintOptions {
public static final String JOIN_STRATEGY = "join_strategy";
// "hash" is the default strategy for non-SEMI joins
public static final String HASH_JOIN_STRATEGY = "hash";
// "dynamic_broadcast" is the default strategy for SEMI joins
public static final String DYNAMIC_BROADCAST_JOIN_STRATEGY = "dynamic_broadcast";
public static final String HASH_TABLE_JOIN_STRATEGY = "hash_table";
// "lookup" can be used when the right table is a dimension table replicated to all workers
public static final String LOOKUP_JOIN_STRATEGY = "lookup";

/**
* Max rows allowed to build the right table hash collection.
*/
public static final String MAX_ROWS_IN_JOIN = "max_rows_in_join";

/**
* Mode when join overflow happens, supported values: THROW or BREAK.
* THROW(default): Break right table build process, and throw exception, no JOIN with left table performed.
* BREAK: Break right table build process, continue to perform JOIN operation, results might be partial.
*/
public static final String JOIN_OVERFLOW_MODE = "join_overflow_mode";

/**
* Indicat that the join operator(s) within a certain selection scope are colocated
* Indicates that the join operator(s) within a certain selection scope are colocated
*/
public static final String IS_COLOCATED_BY_JOIN_KEYS = "is_colocated_by_join_keys";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;


Expand All @@ -48,24 +50,32 @@ public boolean matches(RelOptRuleCall call) {
@Override
public void onMatch(RelOptRuleCall call) {
Join join = call.rel(0);
RelNode leftInput = join.getInput(0);
RelNode rightInput = join.getInput(1);

RelNode leftExchange;
RelNode rightExchange;
RelNode left = PinotRuleUtils.unboxRel(join.getInput(0));
RelNode right = PinotRuleUtils.unboxRel(join.getInput(1));
JoinInfo joinInfo = join.analyzeCondition();

if (joinInfo.leftKeys.isEmpty()) {
// when there's no JOIN key, use broadcast.
leftExchange = PinotLogicalExchange.create(leftInput, RelDistributions.RANDOM_DISTRIBUTED);
rightExchange = PinotLogicalExchange.create(rightInput, RelDistributions.BROADCAST_DISTRIBUTED);
String joinStrategy = PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
RelNode newLeft;
RelNode newRight;
if (PinotHintOptions.JoinHintOptions.LOOKUP_JOIN_STRATEGY.equals(joinStrategy)) {
// Lookup join - add local exchange on the left side
newLeft = PinotLogicalExchange.create(left, RelDistributions.SINGLETON);
newRight = right;
} else {
// when join key exists, use hash distribution.
leftExchange = PinotLogicalExchange.create(leftInput, RelDistributions.hash(joinInfo.leftKeys));
rightExchange = PinotLogicalExchange.create(rightInput, RelDistributions.hash(joinInfo.rightKeys));
// Regular join - add exchange on both sides
if (joinInfo.leftKeys.isEmpty()) {
// Broadcast the right side if there is no join key
newLeft = PinotLogicalExchange.create(left, RelDistributions.RANDOM_DISTRIBUTED);
newRight = PinotLogicalExchange.create(right, RelDistributions.BROADCAST_DISTRIBUTED);
} else {
// Use hash exchange when there are join keys
newLeft = PinotLogicalExchange.create(left, RelDistributions.hash(joinInfo.leftKeys));
newRight = PinotLogicalExchange.create(right, RelDistributions.hash(joinInfo.rightKeys));
}
}

call.transformTo(join.copy(join.getTraitSet(), join.getCondition(), leftExchange, rightExchange, join.getJoinType(),
// TODO: Consider creating different JOIN Rel for each join strategy
call.transformTo(join.copy(join.getTraitSet(), join.getCondition(), newLeft, newRight, join.getJoinType(),
join.isSemiJoinDone()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.pinot.calcite.rel.rules;

import java.util.Collections;
import java.util.List;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.hep.HepRelVertex;
Expand All @@ -35,7 +33,6 @@
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
import org.apache.zookeeper.common.StringUtils;


/**
Expand Down Expand Up @@ -125,25 +122,27 @@ public PinotJoinToDynamicBroadcastRule(RelBuilderFactory factory) {
@Override
public boolean matches(RelOptRuleCall call) {
Join join = call.rel(0);
String joinStrategyString =
PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
List<String> joinStrategies =
joinStrategyString != null ? StringUtils.split(joinStrategyString, ",") : Collections.emptyList();
boolean explicitOtherStrategy = !joinStrategies.isEmpty() && !joinStrategies.contains(
PinotHintOptions.JoinHintOptions.DYNAMIC_BROADCAST_JOIN_STRATEGY);

// Do not apply this rule if join strategy is explicitly set to something other than dynamic broadcast
String joinStrategy = PinotHintStrategyTable.getHintOption(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
if (joinStrategy != null && !joinStrategy.equals(
PinotHintOptions.JoinHintOptions.DYNAMIC_BROADCAST_JOIN_STRATEGY)) {
return false;
}

// Do not apply this rule if it is not a SEMI join
JoinInfo joinInfo = join.analyzeCondition();
if (join.getJoinType() != JoinRelType.SEMI || !joinInfo.nonEquiConditions.isEmpty()
|| joinInfo.leftKeys.size() != 1) {
return false;
}

// Apply this rule if the left side can be pushed as dynamic exchange
RelNode left = ((HepRelVertex) join.getLeft()).getCurrentRel();
RelNode right = ((HepRelVertex) join.getRight()).getCurrentRel();
return left instanceof Exchange && right instanceof Exchange
// left side can be pushed as dynamic exchange
&& PinotRuleUtils.canPushDynamicBroadcastToLeaf(left.getInput(0))
// default enable dynamic broadcast for SEMI join unless other join strategy were specified
&& !explicitOtherStrategy
// condition for SEMI join
&& join.getJoinType() == JoinRelType.SEMI && joinInfo.nonEquiConditions.isEmpty()
&& joinInfo.leftKeys.size() == 1;
return left instanceof Exchange && right instanceof Exchange && PinotRuleUtils.canPushDynamicBroadcastToLeaf(
left.getInput(0));
}

@Override
Expand All @@ -158,15 +157,10 @@ public void onMatch(RelOptRuleCall call) {
boolean isColocatedJoin =
PinotHintStrategyTable.isHintOptionTrue(join.getHints(), PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
PinotLogicalExchange dynamicBroadcastExchange;
RelNode rightInput = right.getInput();
if (isColocatedJoin) {
RelDistribution dist = RelDistributions.hash(join.analyzeCondition().rightKeys);
dynamicBroadcastExchange = PinotLogicalExchange.create(rightInput, dist, PinotRelExchangeType.PIPELINE_BREAKER);
} else {
RelDistribution dist = RelDistributions.BROADCAST_DISTRIBUTED;
dynamicBroadcastExchange = PinotLogicalExchange.create(rightInput, dist, PinotRelExchangeType.PIPELINE_BREAKER);
}
RelDistribution relDistribution = isColocatedJoin ? RelDistributions.hash(join.analyzeCondition().rightKeys)
: RelDistributions.BROADCAST_DISTRIBUTED;
PinotLogicalExchange dynamicBroadcastExchange =
PinotLogicalExchange.create(right.getInput(), relDistribution, PinotRelExchangeType.PIPELINE_BREAKER);

call.transformTo(join.copy(join.getTraitSet(), join.getCondition(), left.getInput(), dynamicBroadcastExchange,
join.getJoinType(), join.isSemiJoinDone()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.query.planner.logical;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -32,9 +33,12 @@
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Exchange;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.SetOp;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
Expand All @@ -45,13 +49,18 @@
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
import org.apache.pinot.calcite.rel.hint.PinotHintStrategyTable;
import org.apache.pinot.calcite.rel.logical.PinotLogicalAggregate;
import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange;
import org.apache.pinot.calcite.rel.logical.PinotLogicalSortExchange;
import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.calcite.rel.rules.PinotRuleUtils;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.DataSchema;
Expand Down Expand Up @@ -264,11 +273,62 @@ private TableScanNode convertLogicalTableScan(LogicalTableScan node) {
convertInputs(node.getInputs()), tableName, columns);
}

private JoinNode convertLogicalJoin(LogicalJoin node) {
JoinInfo joinInfo = node.analyzeCondition();
return new JoinNode(DEFAULT_STAGE_ID, toDataSchema(node.getRowType()), NodeHint.fromRelHints(node.getHints()),
convertInputs(node.getInputs()), node.getJoinType(), joinInfo.leftKeys, joinInfo.rightKeys,
RexExpressionUtils.fromRexNodes(joinInfo.nonEquiConditions));
private JoinNode convertLogicalJoin(LogicalJoin join) {
JoinInfo joinInfo = join.analyzeCondition();
DataSchema dataSchema = toDataSchema(join.getRowType());
List<PlanNode> inputs = convertInputs(join.getInputs());
JoinRelType joinType = join.getJoinType();

// Run some validations for join
Preconditions.checkState(inputs.size() == 2, "Join should have exactly 2 inputs, got: %s", inputs.size());
PlanNode left = inputs.get(0);
PlanNode right = inputs.get(1);
int numLeftColumns = left.getDataSchema().size();
int numResultColumns = dataSchema.size();
if (joinType.projectsRight()) {
int numRightColumns = right.getDataSchema().size();
Preconditions.checkState(numLeftColumns + numRightColumns == numResultColumns,
"Invalid number of columns for join type: %s, left: %s, right: %s, result: %s", joinType, numLeftColumns,
numRightColumns, numResultColumns);
} else {
Preconditions.checkState(numLeftColumns == numResultColumns,
"Invalid number of columns for join type: %s, left: %s, result: %s", joinType, numLeftColumns,
numResultColumns);
}

// Check if the join hint specifies the join strategy
JoinNode.JoinStrategy joinStrategy;
ImmutableList<RelHint> relHints = join.getHints();
String joinStrategyHint = PinotHintStrategyTable.getHintOption(relHints, PinotHintOptions.JOIN_HINT_OPTIONS,
PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
if (PinotHintOptions.JoinHintOptions.LOOKUP_JOIN_STRATEGY.equals(joinStrategyHint)) {
joinStrategy = JoinNode.JoinStrategy.LOOKUP;

// Run some validations for lookup join
Preconditions.checkArgument(!joinInfo.leftKeys.isEmpty(), "Lookup join requires join keys");
// Right table should be a dimension table, and the right input should be an identifier only ProjectNode over
// TableScanNode.
RelNode rightInput = PinotRuleUtils.unboxRel(join.getRight());
Preconditions.checkState(rightInput instanceof Project, "Right input for lookup join must be a Project, got: %s",
rightInput.getClass().getSimpleName());
Project project = (Project) rightInput;
for (RexNode node : project.getProjects()) {
Preconditions.checkState(node instanceof RexInputRef,
"Right input for lookup join must be an identifier (RexInputRef) only Project, got: %s in project",
node.getClass().getSimpleName());
}
RelNode projectInput = PinotRuleUtils.unboxRel(project.getInput());
Preconditions.checkState(projectInput instanceof TableScan,
"Right input for lookup join must be a Project over TableScan, got Project over: %s",
projectInput.getClass().getSimpleName());
} else {
// TODO: Consider adding DYNAMIC_BROADCAST as a separate join strategy
joinStrategy = JoinNode.JoinStrategy.HASH;
}

return new JoinNode(DEFAULT_STAGE_ID, dataSchema, NodeHint.fromRelHints(relHints), inputs, joinType,
joinInfo.leftKeys, joinInfo.rightKeys, RexExpressionUtils.fromRexNodes(joinInfo.nonEquiConditions),
joinStrategy);
}

private List<PlanNode> convertInputs(List<RelNode> inputs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@ public class JoinNode extends BasePlanNode {
private final List<Integer> _leftKeys;
private final List<Integer> _rightKeys;
private final List<RexExpression> _nonEquiConditions;
private final JoinStrategy _joinStrategy;

public JoinNode(int stageId, DataSchema dataSchema, NodeHint nodeHint, List<PlanNode> inputs, JoinRelType joinType,
List<Integer> leftKeys, List<Integer> rightKeys, List<RexExpression> nonEquiConditions) {
List<Integer> leftKeys, List<Integer> rightKeys, List<RexExpression> nonEquiConditions,
JoinStrategy joinStrategy) {
super(stageId, dataSchema, nodeHint, inputs);
_joinType = joinType;
_leftKeys = leftKeys;
_rightKeys = rightKeys;
_nonEquiConditions = nonEquiConditions;
_joinStrategy = joinStrategy;
}

public JoinRelType getJoinType() {
Expand All @@ -56,6 +59,10 @@ public List<RexExpression> getNonEquiConditions() {
return _nonEquiConditions;
}

public JoinStrategy getJoinStrategy() {
return _joinStrategy;
}

@Override
public String explain() {
return "JOIN";
Expand All @@ -68,7 +75,8 @@ public <T, C> T visit(PlanNodeVisitor<T, C> visitor, C context) {

@Override
public PlanNode withInputs(List<PlanNode> inputs) {
return new JoinNode(_stageId, _dataSchema, _nodeHint, inputs, _joinType, _leftKeys, _rightKeys, _nonEquiConditions);
return new JoinNode(_stageId, _dataSchema, _nodeHint, inputs, _joinType, _leftKeys, _rightKeys, _nonEquiConditions,
_joinStrategy);
}

@Override
Expand All @@ -84,11 +92,16 @@ public boolean equals(Object o) {
}
JoinNode joinNode = (JoinNode) o;
return _joinType == joinNode._joinType && Objects.equals(_leftKeys, joinNode._leftKeys) && Objects.equals(
_rightKeys, joinNode._rightKeys) && Objects.equals(_nonEquiConditions, joinNode._nonEquiConditions);
_rightKeys, joinNode._rightKeys) && Objects.equals(_nonEquiConditions, joinNode._nonEquiConditions)
&& _joinStrategy == joinNode._joinStrategy;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), _joinType, _leftKeys, _rightKeys, _nonEquiConditions);
return Objects.hash(super.hashCode(), _joinType, _leftKeys, _rightKeys, _nonEquiConditions, _joinStrategy);
}

public enum JoinStrategy {
HASH, LOOKUP
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public static NodeHint fromRelHints(List<RelHint> relHints) {
} else {
hintOptions = Maps.newHashMapWithExpectedSize(numHints);
for (RelHint relHint : relHints) {
hintOptions.put(relHint.hintName, relHint.kvOptions);
// Put the first matching hint to match the behavior of PinotHintStrategyTable
hintOptions.putIfAbsent(relHint.hintName, relHint.kvOptions);
}
}
return new NodeHint(hintOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ private static JoinNode deserializeJoinNode(Plan.PlanNode protoNode) {
Plan.JoinNode protoJoinNode = protoNode.getJoinNode();
return new JoinNode(protoNode.getStageId(), extractDataSchema(protoNode), extractNodeHint(protoNode),
extractInputs(protoNode), convertJoinType(protoJoinNode.getJoinType()), protoJoinNode.getLeftKeysList(),
protoJoinNode.getRightKeysList(), convertExpressions(protoJoinNode.getNonEquiConditionsList()));
protoJoinNode.getRightKeysList(), convertExpressions(protoJoinNode.getNonEquiConditionsList()),
convertJoinStrategy(protoJoinNode.getJoinStrategy()));
}

private static MailboxReceiveNode deserializeMailboxReceiveNode(Plan.PlanNode protoNode) {
Expand Down Expand Up @@ -274,6 +275,17 @@ private static JoinRelType convertJoinType(Plan.JoinType joinType) {
}
}

private static JoinNode.JoinStrategy convertJoinStrategy(Plan.JoinStrategy joinStrategy) {
switch (joinStrategy) {
case HASH:
return JoinNode.JoinStrategy.HASH;
case LOOKUP:
return JoinNode.JoinStrategy.LOOKUP;
default:
throw new IllegalStateException("Unsupported JoinStrategy: " + joinStrategy);
}
}

private static PinotRelExchangeType convertExchangeType(Plan.ExchangeType exchangeType) {
switch (exchangeType) {
case STREAMING:
Expand Down
Loading