Skip to content

Commit 49d0a75

Browse files
SinBexzhuzhurk
authored andcommitted
[FLINK-36607][table-planner] Introduce AdaptiveJoinProcessor to inject adaptive join node.
1 parent c1d2ddc commit 49d0a75

File tree

4 files changed

+536
-2
lines changed

4 files changed

+536
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.nodes.exec.processor;
20+
21+
import org.apache.flink.configuration.JobManagerOptions;
22+
import org.apache.flink.table.api.TableConfig;
23+
import org.apache.flink.table.api.TableException;
24+
import org.apache.flink.table.api.config.OptimizerConfigOptions;
25+
import org.apache.flink.table.planner.plan.nodes.exec.AdaptiveJoinExecNode;
26+
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
27+
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
28+
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
29+
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
30+
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType;
31+
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecAdaptiveJoin;
32+
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
33+
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
34+
import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
35+
import org.apache.flink.table.planner.plan.utils.OperatorType;
36+
import org.apache.flink.table.planner.utils.TableConfigUtils;
37+
38+
import java.util.ArrayList;
39+
import java.util.List;
40+
import java.util.stream.Collectors;
41+
42+
import static org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType.KEEP_INPUT_AS_IS;
43+
44+
/**
45+
* A {@link ExecNodeGraphProcessor} which replace the qualified join nodes into adaptive join nodes.
46+
*/
47+
public class AdaptiveJoinProcessor implements ExecNodeGraphProcessor {
48+
49+
@Override
50+
public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext context) {
51+
if (execGraph.getRootNodes().get(0) instanceof StreamExecNode) {
52+
throw new TableException("AdaptiveJoin does not support streaming jobs.");
53+
}
54+
if (!isAdaptiveJoinEnabled(context)) {
55+
return execGraph;
56+
}
57+
58+
AbstractExecNodeExactlyOnceVisitor visitor =
59+
new AbstractExecNodeExactlyOnceVisitor() {
60+
@Override
61+
protected void visitNode(ExecNode<?> node) {
62+
visitInputs(node);
63+
// AdaptiveJoin conversion should be avoided when there is a
64+
// KEEP_INPUT_AS_IS constraint downstream. And we don't need to check all
65+
// downstream nodes of the join, because the KEEP_INPUT_AS_IS constraint
66+
// will be bound to BatchExecExchange, which will be the direct downstream
67+
// node of the join.
68+
if (shouldKeepInputAsIs(node.getInputProperties())) {
69+
return;
70+
}
71+
for (int i = 0; i < node.getInputEdges().size(); ++i) {
72+
ExecEdge edge = node.getInputEdges().get(i);
73+
ExecNode<?> newNode = tryReplaceWithAdaptiveJoinNode(edge.getSource());
74+
node.replaceInputEdge(
75+
i,
76+
ExecEdge.builder()
77+
.source(newNode)
78+
.target(node)
79+
.shuffle(edge.getShuffle())
80+
.exchangeMode(edge.getExchangeMode())
81+
.build());
82+
}
83+
}
84+
};
85+
86+
List<ExecNode<?>> newRootNodes =
87+
execGraph.getRootNodes().stream()
88+
.map(
89+
node -> {
90+
node = tryReplaceWithAdaptiveJoinNode(node);
91+
node.accept(visitor);
92+
return node;
93+
})
94+
.collect(Collectors.toList());
95+
96+
return new ExecNodeGraph(execGraph.getFlinkVersion(), newRootNodes);
97+
}
98+
99+
private ExecNode<?> tryReplaceWithAdaptiveJoinNode(ExecNode<?> node) {
100+
// For AdaptiveJoin to be converted, its upstream input must ensure:
101+
// 1. Data distribution is by Hash.
102+
// 2. No upstream nodes require KEEP_INPUT_AS_IS (achieved by inserting BatchExecExchange).
103+
if (!(areAllInputsHashShuffle(node))
104+
|| shouldKeepUpstreamExchangeInputAsIs(node.getInputEdges())) {
105+
return node;
106+
}
107+
ExecNode<?> newNode = node;
108+
if (node instanceof AdaptiveJoinExecNode
109+
&& ((AdaptiveJoinExecNode) node).canBeTransformedToAdaptiveJoin()) {
110+
BatchExecAdaptiveJoin adaptiveJoin = ((AdaptiveJoinExecNode) node).toAdaptiveJoinNode();
111+
replaceInputEdge(adaptiveJoin, node);
112+
newNode = adaptiveJoin;
113+
}
114+
115+
return newNode;
116+
}
117+
118+
private boolean shouldKeepInputAsIs(List<InputProperty> inputProperties) {
119+
return inputProperties.stream()
120+
.anyMatch(
121+
inputProperty ->
122+
inputProperty.getRequiredDistribution().getType()
123+
== KEEP_INPUT_AS_IS);
124+
}
125+
126+
// If KEEP_INPUT_AS_IS constraint exists on an operator, it will always show on its upstream
127+
// BatchExecExchange.
128+
private boolean shouldKeepUpstreamExchangeInputAsIs(List<ExecEdge> inputEdges) {
129+
return inputEdges.stream()
130+
.filter(execEdge -> execEdge.getSource() instanceof BatchExecExchange)
131+
.map(execEdge -> (BatchExecExchange) execEdge.getSource())
132+
.anyMatch(exchange -> shouldKeepInputAsIs(exchange.getInputProperties()));
133+
}
134+
135+
private boolean isAdaptiveJoinEnabled(ProcessorContext context) {
136+
TableConfig tableConfig = context.getPlanner().getTableConfig();
137+
boolean isAdaptiveJoinEnabled =
138+
tableConfig.get(
139+
OptimizerConfigOptions
140+
.TABLE_OPTIMIZER_ADAPTIVE_BROADCAST_JOIN_STRATEGY)
141+
!= OptimizerConfigOptions.AdaptiveBroadcastJoinStrategy.NONE
142+
&& !TableConfigUtils.isOperatorDisabled(
143+
tableConfig, OperatorType.BroadcastHashJoin);
144+
JobManagerOptions.SchedulerType schedulerType =
145+
context.getPlanner()
146+
.getExecEnv()
147+
.getConfig()
148+
.getSchedulerType()
149+
.orElse(JobManagerOptions.SchedulerType.AdaptiveBatch);
150+
boolean isAdaptiveBatchSchedulerEnabled =
151+
schedulerType == JobManagerOptions.SchedulerType.AdaptiveBatch;
152+
153+
return isAdaptiveJoinEnabled && isAdaptiveBatchSchedulerEnabled;
154+
}
155+
156+
private boolean areAllInputsHashShuffle(ExecNode<?> node) {
157+
for (InputProperty inputProperty : node.getInputProperties()) {
158+
if (inputProperty.getRequiredDistribution().getType() != DistributionType.HASH) {
159+
return false;
160+
}
161+
}
162+
return true;
163+
}
164+
165+
private void replaceInputEdge(ExecNode<?> newNode, ExecNode<?> originalNode) {
166+
List<ExecEdge> inputEdges = new ArrayList<>();
167+
for (int i = 0; i < originalNode.getInputEdges().size(); ++i) {
168+
ExecEdge edge = originalNode.getInputEdges().get(i);
169+
inputEdges.add(
170+
ExecEdge.builder()
171+
.source(edge.getSource())
172+
.target(newNode)
173+
.shuffle(edge.getShuffle())
174+
.exchangeMode(edge.getExchangeMode())
175+
.build());
176+
}
177+
newNode.setInputEdges(inputEdges);
178+
}
179+
}

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ import org.apache.flink.table.api.config.OptimizerConfigOptions
2525
import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog}
2626
import org.apache.flink.table.delegation.{Executor, InternalPlan}
2727
import org.apache.flink.table.module.ModuleManager
28-
import org.apache.flink.table.operations.{ModifyOperation, Operation}
28+
import org.apache.flink.table.operations.Operation
2929
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef
3030
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph
3131
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode
32-
import org.apache.flink.table.planner.plan.nodes.exec.processor.{DeadlockBreakupProcessor, DynamicFilteringDependencyProcessor, ExecNodeGraphProcessor, ForwardHashExchangeProcessor, MultipleInputNodeCreationProcessor}
32+
import org.apache.flink.table.planner.plan.nodes.exec.processor.{AdaptiveJoinProcessor, DeadlockBreakupProcessor, DynamicFilteringDependencyProcessor, ExecNodeGraphProcessor, ForwardHashExchangeProcessor, MultipleInputNodeCreationProcessor}
3333
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodePlanDumper
3434
import org.apache.flink.table.planner.plan.optimize.{BatchCommonSubGraphBasedOptimizer, Optimizer}
3535
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
@@ -81,6 +81,7 @@ class BatchPlanner(
8181
processors.add(new MultipleInputNodeCreationProcessor(false))
8282
}
8383
processors.add(new ForwardHashExchangeProcessor)
84+
processors.add(new AdaptiveJoinProcessor)
8485
processors
8586
}
8687

0 commit comments

Comments
 (0)