From 49d0a759ada228c2be87a57b063f62db2e6aa0d6 Mon Sep 17 00:00:00 2001 From: sunxia Date: Thu, 7 Nov 2024 11:24:41 +0800 Subject: [PATCH] [FLINK-36607][table-planner] Introduce AdaptiveJoinProcessor to inject adaptive join node. --- .../exec/processor/AdaptiveJoinProcessor.java | 179 ++++++++++++++ .../planner/delegation/BatchPlanner.scala | 5 +- .../batch/sql/adaptive/AdaptiveJoinTest.xml | 231 ++++++++++++++++++ .../batch/sql/adaptive/AdaptiveJoinTest.scala | 123 ++++++++++ 4 files changed, 536 insertions(+), 2 deletions(-) create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/AdaptiveJoinProcessor.java create mode 100644 flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/adaptive/AdaptiveJoinTest.xml create mode 100644 flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/adaptive/AdaptiveJoinTest.scala diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/AdaptiveJoinProcessor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/AdaptiveJoinProcessor.java new file mode 100644 index 0000000000000..cbe7386c78a1b --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/AdaptiveJoinProcessor.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.processor; + +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.plan.nodes.exec.AdaptiveJoinExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType; +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecAdaptiveJoin; +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor; +import org.apache.flink.table.planner.plan.utils.OperatorType; +import org.apache.flink.table.planner.utils.TableConfigUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType.KEEP_INPUT_AS_IS; + +/** + * A {@link ExecNodeGraphProcessor} which replace the qualified join nodes into adaptive join nodes. + */ +public class AdaptiveJoinProcessor implements ExecNodeGraphProcessor { + + @Override + public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext context) { + if (execGraph.getRootNodes().get(0) instanceof StreamExecNode) { + throw new TableException("AdaptiveJoin does not support streaming jobs."); + } + if (!isAdaptiveJoinEnabled(context)) { + return execGraph; + } + + AbstractExecNodeExactlyOnceVisitor visitor = + new AbstractExecNodeExactlyOnceVisitor() { + @Override + protected void visitNode(ExecNode node) { + visitInputs(node); + // AdaptiveJoin conversion should be avoided when there is a + // KEEP_INPUT_AS_IS constraint downstream. And we don't need to check all + // downstream nodes of the join, because the KEEP_INPUT_AS_IS constraint + // will be bound to BatchExecExchange, which will be the direct downstream + // node of the join. + if (shouldKeepInputAsIs(node.getInputProperties())) { + return; + } + for (int i = 0; i < node.getInputEdges().size(); ++i) { + ExecEdge edge = node.getInputEdges().get(i); + ExecNode newNode = tryReplaceWithAdaptiveJoinNode(edge.getSource()); + node.replaceInputEdge( + i, + ExecEdge.builder() + .source(newNode) + .target(node) + .shuffle(edge.getShuffle()) + .exchangeMode(edge.getExchangeMode()) + .build()); + } + } + }; + + List> newRootNodes = + execGraph.getRootNodes().stream() + .map( + node -> { + node = tryReplaceWithAdaptiveJoinNode(node); + node.accept(visitor); + return node; + }) + .collect(Collectors.toList()); + + return new ExecNodeGraph(execGraph.getFlinkVersion(), newRootNodes); + } + + private ExecNode tryReplaceWithAdaptiveJoinNode(ExecNode node) { + // For AdaptiveJoin to be converted, its upstream input must ensure: + // 1. Data distribution is by Hash. + // 2. No upstream nodes require KEEP_INPUT_AS_IS (achieved by inserting BatchExecExchange). + if (!(areAllInputsHashShuffle(node)) + || shouldKeepUpstreamExchangeInputAsIs(node.getInputEdges())) { + return node; + } + ExecNode newNode = node; + if (node instanceof AdaptiveJoinExecNode + && ((AdaptiveJoinExecNode) node).canBeTransformedToAdaptiveJoin()) { + BatchExecAdaptiveJoin adaptiveJoin = ((AdaptiveJoinExecNode) node).toAdaptiveJoinNode(); + replaceInputEdge(adaptiveJoin, node); + newNode = adaptiveJoin; + } + + return newNode; + } + + private boolean shouldKeepInputAsIs(List inputProperties) { + return inputProperties.stream() + .anyMatch( + inputProperty -> + inputProperty.getRequiredDistribution().getType() + == KEEP_INPUT_AS_IS); + } + + // If KEEP_INPUT_AS_IS constraint exists on an operator, it will always show on its upstream + // BatchExecExchange. + private boolean shouldKeepUpstreamExchangeInputAsIs(List inputEdges) { + return inputEdges.stream() + .filter(execEdge -> execEdge.getSource() instanceof BatchExecExchange) + .map(execEdge -> (BatchExecExchange) execEdge.getSource()) + .anyMatch(exchange -> shouldKeepInputAsIs(exchange.getInputProperties())); + } + + private boolean isAdaptiveJoinEnabled(ProcessorContext context) { + TableConfig tableConfig = context.getPlanner().getTableConfig(); + boolean isAdaptiveJoinEnabled = + tableConfig.get( + OptimizerConfigOptions + .TABLE_OPTIMIZER_ADAPTIVE_BROADCAST_JOIN_STRATEGY) + != OptimizerConfigOptions.AdaptiveBroadcastJoinStrategy.NONE + && !TableConfigUtils.isOperatorDisabled( + tableConfig, OperatorType.BroadcastHashJoin); + JobManagerOptions.SchedulerType schedulerType = + context.getPlanner() + .getExecEnv() + .getConfig() + .getSchedulerType() + .orElse(JobManagerOptions.SchedulerType.AdaptiveBatch); + boolean isAdaptiveBatchSchedulerEnabled = + schedulerType == JobManagerOptions.SchedulerType.AdaptiveBatch; + + return isAdaptiveJoinEnabled && isAdaptiveBatchSchedulerEnabled; + } + + private boolean areAllInputsHashShuffle(ExecNode node) { + for (InputProperty inputProperty : node.getInputProperties()) { + if (inputProperty.getRequiredDistribution().getType() != DistributionType.HASH) { + return false; + } + } + return true; + } + + private void replaceInputEdge(ExecNode newNode, ExecNode originalNode) { + List inputEdges = new ArrayList<>(); + for (int i = 0; i < originalNode.getInputEdges().size(); ++i) { + ExecEdge edge = originalNode.getInputEdges().get(i); + inputEdges.add( + ExecEdge.builder() + .source(edge.getSource()) + .target(newNode) + .shuffle(edge.getShuffle()) + .exchangeMode(edge.getExchangeMode()) + .build()); + } + newNode.setInputEdges(inputEdges); + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala index 120c5f70b6fbe..6e8febadec547 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala @@ -25,11 +25,11 @@ import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog} import org.apache.flink.table.delegation.{Executor, InternalPlan} import org.apache.flink.table.module.ModuleManager -import org.apache.flink.table.operations.{ModifyOperation, Operation} +import org.apache.flink.table.operations.Operation import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode -import org.apache.flink.table.planner.plan.nodes.exec.processor.{DeadlockBreakupProcessor, DynamicFilteringDependencyProcessor, ExecNodeGraphProcessor, ForwardHashExchangeProcessor, MultipleInputNodeCreationProcessor} +import org.apache.flink.table.planner.plan.nodes.exec.processor.{AdaptiveJoinProcessor, DeadlockBreakupProcessor, DynamicFilteringDependencyProcessor, ExecNodeGraphProcessor, ForwardHashExchangeProcessor, MultipleInputNodeCreationProcessor} import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodePlanDumper import org.apache.flink.table.planner.plan.optimize.{BatchCommonSubGraphBasedOptimizer, Optimizer} import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil @@ -81,6 +81,7 @@ class BatchPlanner( processors.add(new MultipleInputNodeCreationProcessor(false)) } processors.add(new ForwardHashExchangeProcessor) + processors.add(new AdaptiveJoinProcessor) processors } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/adaptive/AdaptiveJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/adaptive/AdaptiveJoinTest.xml new file mode 100644 index 0000000000000..d40ea85454a46 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/adaptive/AdaptiveJoinTest.xml @@ -0,0 +1,231 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/adaptive/AdaptiveJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/adaptive/AdaptiveJoinTest.scala new file mode 100644 index 0000000000000..790be6da364b7 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/adaptive/AdaptiveJoinTest.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.planner.plan.batch.sql.adaptive + +import org.apache.flink.table.api._ +import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions} +import org.apache.flink.table.planner.utils.TableTestBase + +import org.junit.jupiter.api.{BeforeEach, Test} + +/** Tests for AdaptiveJoinProcessor. */ +class AdaptiveJoinTest extends TableTestBase { + + private val util = batchTestUtil() + + @BeforeEach + def before(): Unit = { + util.addTableSource[(Long, Long, String, Long)]("T", 'a, 'b, 'c, 'd) + util.addTableSource[(Long, Long, String, Long)]("T1", 'a1, 'b1, 'c1, 'd1) + util.addTableSource[(Long, Long, String, Long)]("T2", 'a2, 'b2, 'c2, 'd2) + util.addTableSource[(Long, Long, String, Long)]("T3", 'a3, 'b3, 'c3, 'd3) + util.tableConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_ADAPTIVE_BROADCAST_JOIN_STRATEGY, + OptimizerConfigOptions.AdaptiveBroadcastJoinStrategy.AUTO) + } + + @Test + def testWithShuffleHashJoin(): Unit = { + util.tableEnv.getConfig + .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,SortMergeJoin") + val sql = "SELECT * FROM T1, T2 WHERE a1 = a2" + util.verifyExecPlan(sql) + } + + @Test + def testWithShuffleMergeJoin(): Unit = { + util.tableEnv.getConfig + .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin,ShuffleHashJoin") + val sql = "SELECT * FROM T1, T2 WHERE a1 = a2" + util.verifyExecPlan(sql) + } + + // For join nodes that have already been converted as broadcast join during the compilation phase, + // no further transformation will be performed. + @Test + def testWithStaticBroadcastJoin(): Unit = { + util.tableEnv.getConfig.set( + ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, + "SortMergeJoin,ShuffleHashJoin,NestedLoopJoin") + util.tableEnv.getConfig + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(Long.MaxValue)) + val sql = "SELECT * FROM T1, T2 WHERE a1 = a2" + util.verifyExecPlan(sql) + } + + @Test + def testWithBroadcastJoinRuntimeOnly(): Unit = { + util.tableEnv.getConfig + .set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, "NestedLoopJoin") + util.tableEnv.getConfig + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, Long.box(Long.MaxValue)) + util.tableEnv.getConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_ADAPTIVE_BROADCAST_JOIN_STRATEGY, + OptimizerConfigOptions.AdaptiveBroadcastJoinStrategy.RUNTIME_ONLY) + val sql = "SELECT * FROM T1, T2 WHERE a1 = a2" + util.verifyExecPlan(sql) + } + + @Test + def testJoinWithUnionInput(): Unit = { + val sql = + """ + |SELECT * FROM + | (SELECT a FROM (SELECT a1 as a FROM T1) UNION ALL (SELECT a2 as a FROM T2)) Y + | LEFT JOIN T ON T.a = Y.a + |""".stripMargin + util.verifyExecPlan(sql) + } + + // AdaptiveJoin does not support case of ForwardForConsecutiveHash, it may lead to data incorrectness. + @Test + def testShuffleJoinWithForwardForConsecutiveHash(): Unit = { + util.tableEnv.getConfig + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED, Boolean.box(false)) + val sql = + """ + |WITH + | r AS (SELECT * FROM T1, T2, T3 WHERE a1 = a2 and a1 = a3) + |SELECT sum(b1) FROM r group by a1 + |""".stripMargin + util.verifyExecPlan(sql) + } + + // AdaptiveJoin does not support case of MultipleInput, because the optimizer is unable to see the + // join node within the MultipleInput, but this might be supported in the future. + @Test + def testJoinWithMultipleInput(): Unit = { + val sql = + """ + |SELECT * FROM + | (SELECT a FROM T1 JOIN T ON a = a1) t1 + | INNER JOIN + | (SELECT d2 FROM T JOIN T2 ON d2 = a) t2 + |ON t1.a = t2.d2 + |""".stripMargin + util.verifyExecPlan(sql) + } +}