From 40b9052e9f2618fbabab26f9b3fd4158528921ed Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Fri, 28 Feb 2025 15:08:06 +0100 Subject: [PATCH] [FLINK-37475] Drop ChangelogNormalize for piping from upsert source to sink * Add information about DELETE to ChangelogMode * Adapt FlinkChangelogModeInferenceProgram to remove ChangelogNormalize if possible --- .../flink/table/connector/ChangelogMode.java | 50 ++- ...ChangelogNormalizeRequirementResolver.java | 179 ++++++++ ...reamNonDeterministicUpdatePlanVisitor.java | 21 +- .../table/planner/plan/trait/DeleteKind.java | 35 ++ .../planner/plan/utils/FlinkRelUtil.java | 19 + .../FlinkChangelogModeInferenceProgram.scala | 419 ++++++++++++++++-- .../planner/plan/trait/DeleteKindTrait.scala | 100 +++++ .../plan/trait/DeleteKindTraitDef.scala | 50 +++ .../plan/utils/ChangelogPlanUtils.scala | 18 +- .../factories/TestValuesTableFactory.java | 50 ++- .../exec/stream/DeletesByKeyPrograms.java | 307 +++++++++++++ .../stream/DeletesByKeySemanticTests.java | 39 ++ .../ChangelogNormalizeOptimizationTest.java | 301 +++++++++++++ .../stream/sql/DataStreamJavaITCase.java | 4 +- .../ChangelogNormalizeOptimizationTest.xml | 418 +++++++++++++++++ .../planner/plan/stream/sql/TableScanTest.xml | 14 +- .../stream/sql/NonDeterministicDagTest.scala | 4 +- .../plan/stream/sql/TableScanTest.scala | 9 +- .../table/planner/utils/TableTestBase.scala | 11 +- 19 files changed, 1969 insertions(+), 79 deletions(-) create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/trait/DeleteKind.java create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTrait.scala create mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTraitDef.scala create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeyPrograms.java create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeySemanticTests.java create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.java create mode 100644 flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.xml diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java index 9d4a5fd500fa2..467d374d18397 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java @@ -26,6 +26,7 @@ import java.util.EnumSet; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; /** * The set of changes contained in a changelog. @@ -43,6 +44,15 @@ public final class ChangelogMode { .addContainedKind(RowKind.INSERT) .addContainedKind(RowKind.UPDATE_AFTER) .addContainedKind(RowKind.DELETE) + .keyOnlyDeletes(true) + .build(); + + private static final ChangelogMode UPSERT_WITH_FULL_DELETES = + ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .keyOnlyDeletes(false) .build(); private static final ChangelogMode ALL = @@ -54,11 +64,13 @@ public final class ChangelogMode { .build(); private final Set kinds; + private final boolean keyOnlyDeletes; - private ChangelogMode(Set kinds) { + private ChangelogMode(Set kinds, boolean keyOnlyDeletes) { Preconditions.checkArgument( kinds.size() > 0, "At least one kind of row should be contained in a changelog."); this.kinds = Collections.unmodifiableSet(kinds); + this.keyOnlyDeletes = keyOnlyDeletes; } /** Shortcut for a simple {@link RowKind#INSERT}-only changelog. */ @@ -71,7 +83,21 @@ public static ChangelogMode insertOnly() { * contain {@link RowKind#UPDATE_BEFORE} rows. */ public static ChangelogMode upsert() { - return UPSERT; + return upsert(true); + } + + /** + * Shortcut for an upsert changelog that describes idempotent updates on a key and thus does not + * contain {@link RowKind#UPDATE_BEFORE} rows. + * + * @param keyOnlyDeletes Tells the system the DELETEs contain just the key. + */ + public static ChangelogMode upsert(boolean keyOnlyDeletes) { + if (keyOnlyDeletes) { + return UPSERT; + } else { + return UPSERT_WITH_FULL_DELETES; + } } /** Shortcut for a changelog that can contain all {@link RowKind}s. */ @@ -96,6 +122,10 @@ public boolean containsOnly(RowKind kind) { return kinds.size() == 1 && kinds.contains(kind); } + public boolean keyOnlyDeletes() { + return keyOnlyDeletes; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -115,7 +145,13 @@ public int hashCode() { @Override public String toString() { - return kinds.toString(); + if (!keyOnlyDeletes) { + return kinds.toString(); + } else { + return kinds.stream() + .map(kind -> kind == RowKind.DELETE ? "~D" : kind.toString()) + .collect(Collectors.joining(", ", "[", "]")); + } } // -------------------------------------------------------------------------------------------- @@ -125,6 +161,7 @@ public String toString() { public static class Builder { private final Set kinds = EnumSet.noneOf(RowKind.class); + private boolean keyOnlyDeletes = false; private Builder() { // default constructor to allow a fluent definition @@ -135,8 +172,13 @@ public Builder addContainedKind(RowKind kind) { return this; } + public Builder keyOnlyDeletes(boolean flag) { + this.keyOnlyDeletes = flag; + return this; + } + public ChangelogMode build() { - return new ChangelogMode(kinds); + return new ChangelogMode(kinds, keyOnlyDeletes); } } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java new file mode 100644 index 0000000000000..2191fd519409f --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.optimize; + +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.planner.connectors.DynamicSourceUtils; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalcBase; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.trait.UpdateKindTrait$; +import org.apache.flink.table.planner.plan.trait.UpdateKindTraitDef$; +import org.apache.flink.table.planner.plan.utils.FlinkRelUtil; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.SingleRel; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableBitSet; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Checks if it is safe to remove ChangelogNormalize as part of {@link + * FlinkChangelogModeInferenceProgram}. It checks: + * + *
    + *
  • if there is no filter pushed into the changelog normalize + *
  • if we don't need to produce UPDATE_BEFORE + *
  • we don't access any metadata columns + *
+ */ +public class ChangelogNormalizeRequirementResolver { + + /** Checks if it is safe to remove ChangelogNormalize. */ + public static boolean isRequired(StreamPhysicalChangelogNormalize normalize) { + if (normalize.filterCondition() != null) { + return true; + } + if (!Objects.equals( + normalize.getTraitSet().getTrait(UpdateKindTraitDef$.MODULE$.INSTANCE()), + UpdateKindTrait$.MODULE$.ONLY_UPDATE_AFTER())) { + return true; + } + + // check if metadata columns are accessed + final RelNode input = normalize.getInput(); + + return visit(input, bitSetForAllOutputColumns(input)); + } + + private static ImmutableBitSet bitSetForAllOutputColumns(RelNode input) { + return ImmutableBitSet.builder().set(0, input.getRowType().getFieldCount()).build(); + } + + private static boolean visit(final RelNode rel, final ImmutableBitSet requiredColumns) { + if (rel instanceof StreamPhysicalCalcBase) { + return visitCalc((StreamPhysicalCalcBase) rel, requiredColumns); + } else if (rel instanceof StreamPhysicalTableSourceScan) { + return visitTableSourceScan((StreamPhysicalTableSourceScan) rel, requiredColumns); + } else if (rel instanceof StreamPhysicalWatermarkAssigner + || rel instanceof StreamPhysicalExchange) { + // require all input columns + final RelNode input = ((SingleRel) rel).getInput(); + return visit(input, bitSetForAllOutputColumns(input)); + } else { + // these nodes should not be in an input of a changelog normalize + // StreamPhysicalChangelogNormalize + // StreamPhysicalDropUpdateBefore + // StreamPhysicalUnion + // StreamPhysicalSort + // StreamPhysicalLimit + // StreamPhysicalSortLimit + // StreamPhysicalTemporalSort + // StreamPhysicalWindowTableFunction + // StreamPhysicalWindowRank + // StreamPhysicalWindowDeduplicate + // StreamPhysicalRank + // StreamPhysicalOverAggregateBase + // CommonPhysicalJoin + // StreamPhysicalMatch + // StreamPhysicalMiniBatchAssigner + // StreamPhysicalExpand + // StreamPhysicalWindowAggregateBase + // StreamPhysicalGroupAggregateBase + // StreamPhysicalSink + // StreamPhysicalLegacySink + // StreamPhysicalCorrelateBase + // StreamPhysicalLookupJoin + // StreamPhysicalValues + // StreamPhysicalDataStreamScan + // StreamPhysicalLegacyTableSourceScan + throw new UnsupportedOperationException( + String.format( + "Unsupported to visit node %s. The node either should not be pushed" + + " through the changelog normalize or is not supported yet.", + rel.getClass().getSimpleName())); + } + } + + private static boolean visitTableSourceScan( + StreamPhysicalTableSourceScan tableScan, ImmutableBitSet requiredColumns) { + if (!(tableScan.tableSource() instanceof SupportsReadingMetadata)) { + // source does not have metadata, no need to check + return false; + } + final TableSourceTable sourceTable = tableScan.getTable().unwrap(TableSourceTable.class); + assert sourceTable != null; + // check if requiredColumns contain metadata column + final List metadataColumns = + DynamicSourceUtils.extractMetadataColumns( + sourceTable.contextResolvedTable().getResolvedSchema()); + final Set metaColumnSet = + metadataColumns.stream().map(Column::getName).collect(Collectors.toSet()); + final List columns = tableScan.getRowType().getFieldNames(); + for (int index = 0; index < columns.size(); index++) { + String column = columns.get(index); + if (metaColumnSet.contains(column) && requiredColumns.get(index)) { + // we require metadata column, therefore, we cannot remove the changelog normalize + return true; + } + } + + return false; + } + + private static boolean visitCalc(StreamPhysicalCalcBase calc, ImmutableBitSet requiredColumns) { + // evaluate required columns from input + final List projects = + calc.getProgram().getProjectList().stream() + .map(expr -> calc.getProgram().expandLocalRef(expr)) + .collect(Collectors.toList()); + final Map> outFromSourcePos = + FlinkRelUtil.extractSourceMapping(projects); + final List conv2Inputs = + requiredColumns.toList().stream() + .map( + out -> + Optional.ofNullable(outFromSourcePos.get(out)) + .orElseThrow( + () -> + new IllegalStateException( + String.format( + "Invalid pos:%d over projection:%s", + out, + calc + .getProgram())))) + .flatMap(Collection::stream) + .filter(index -> index != -1) + .distinct() + .collect(Collectors.toList()); + return visit(calc.getInput(), ImmutableBitSet.of(conv2Inputs)); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java index 98a1a6782cf1a..aaa4c4ff845d2 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java @@ -62,6 +62,7 @@ import org.apache.flink.table.planner.plan.schema.TableSourceTable; import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils; import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil; +import org.apache.flink.table.planner.plan.utils.FlinkRelUtil; import org.apache.flink.table.planner.plan.utils.FlinkRexUtil; import org.apache.flink.table.planner.plan.utils.JoinUtil; import org.apache.flink.table.planner.plan.utils.OverAggregateUtil; @@ -78,7 +79,6 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexProgram; -import org.apache.calcite.rex.RexSlot; import org.apache.calcite.sql.SqlExplainLevel; import org.apache.calcite.util.ImmutableBitSet; @@ -297,7 +297,8 @@ private StreamPhysicalRel visitCalc( calc.getProgram().getProjectList().stream() .map(expr -> calc.getProgram().expandLocalRef(expr)) .collect(Collectors.toList()); - Map> outFromSourcePos = extractSourceMapping(projects); + Map> outFromSourcePos = + FlinkRelUtil.extractSourceMapping(projects); List conv2Inputs = requireDeterminism.toList().stream() .map( @@ -812,22 +813,6 @@ private StreamPhysicalRel visitJoinChild( return transmitDeterminismRequirement(rel, inputRequireDeterminism); } - /** Extracts the out from source field index mapping of the given projects. */ - private Map> extractSourceMapping(final List projects) { - Map> mapOutFromInPos = new HashMap<>(); - - for (int index = 0; index < projects.size(); index++) { - RexNode expr = projects.get(index); - mapOutFromInPos.put( - index, - FlinkRexUtil.findAllInputRefs(expr).stream() - .mapToInt(RexSlot::getIndex) - .boxed() - .collect(Collectors.toList())); - } - return mapOutFromInPos; - } - private void checkNonDeterministicRexProgram( final ImmutableBitSet requireDeterminism, final RexProgram program, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/trait/DeleteKind.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/trait/DeleteKind.java new file mode 100644 index 0000000000000..8f864f7ac4849 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/trait/DeleteKind.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.trait; + +/** Lists all kinds of {@link ModifyKind#DELETE} operation. */ +public enum DeleteKind { + + /** This kind indicates that operators do not emit {@link ModifyKind#DELETE} operation. */ + NONE, + + /** + * This kind indicates that operators can emit deletes with the key only. The rest of the row + * may be not present. + */ + DELETE_BY_KEY, + + /** This kind indicates that operators should emit deletes with the full row. */ + FULL_DELETE +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FlinkRelUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FlinkRelUtil.java index a920a6133ff5c..ee39cdb807bb6 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FlinkRelUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FlinkRelUtil.java @@ -28,12 +28,15 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexProgram; import org.apache.calcite.rex.RexProgramBuilder; +import org.apache.calcite.rex.RexSlot; import org.apache.calcite.rex.RexUtil; import org.apache.calcite.rex.RexVisitorImpl; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; /** Utilities for {@link RelNode}. */ @@ -170,6 +173,22 @@ public static int[] initializeArray(int length, int initVal) { return array; } + /** Extracts the out from source field index mapping of the given projects. */ + public static Map> extractSourceMapping(final List projects) { + Map> mapOutFromInPos = new HashMap<>(); + + for (int index = 0; index < projects.size(); index++) { + RexNode expr = projects.get(index); + mapOutFromInPos.put( + index, + FlinkRexUtil.findAllInputRefs(expr).stream() + .mapToInt(RexSlot::getIndex) + .boxed() + .collect(Collectors.toList())); + } + return mapOutFromInPos; + } + /** * An InputRefCounter that count every inputRef's reference count number, every reference will * be counted, e.g., '$0 + 1' & '$0 + 2' will count 2 instead of 1. diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index 620cd5802ab1d..988bcbc355455 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -23,9 +23,11 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize import org.apache.flink.table.connector.ChangelogMode import org.apache.flink.table.planner.plan.`trait`._ +import org.apache.flink.table.planner.plan.`trait`.DeleteKindTrait.{deleteOnKeyOrNone, fullDeleteOrNone, DELETE_BY_KEY, FULL_DELETE} import org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{beforeAfterOrNone, onlyAfterOrNone, BEFORE_AND_AFTER, ONLY_UPDATE_AFTER} import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery import org.apache.flink.table.planner.plan.nodes.physical.stream._ +import org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver import org.apache.flink.table.planner.plan.utils._ import org.apache.flink.table.planner.plan.utils.RankProcessStrategy.{AppendFastStrategy, RetractStrategy, UpdateFastStrategy} import org.apache.flink.table.planner.sinks.DataStreamTableSink @@ -70,12 +72,33 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti } val updateKindTraitVisitor = new SatisfyUpdateKindTraitVisitor(context) - val finalRoot = requiredUpdateKindTraits.flatMap { + val updateRoot = requiredUpdateKindTraits.flatMap { requiredUpdateKindTrait => updateKindTraitVisitor.visit(rootWithModifyKindSet, requiredUpdateKindTrait) } - // step3: sanity check and return non-empty root + // step3: satisfy DeleteKind trait + val requiredDeleteKindTraits = if (rootModifyKindSet.contains(ModifyKind.DELETE)) { + root match { + case _: StreamPhysicalSink => + // try DELETE_BY_KEY first, and then FULL_DELETE + Seq(DeleteKindTrait.DELETE_BY_KEY, DeleteKindTrait.FULL_DELETE) + case _ => + // for non-sink nodes prefer full deletes + Seq(DeleteKindTrait.FULL_DELETE) + } + } else { + // there is no deletes + Seq(DeleteKindTrait.NONE) + } + + val deleteKindTraitVisitor = new SatisfyDeleteKindTraitVisitor(context) + val finalRoot = requiredDeleteKindTraits.flatMap { + requiredDeleteKindTrait => + deleteKindTraitVisitor.visit(updateRoot.head, requiredDeleteKindTrait) + } + + // step4: sanity check and return non-empty root if (finalRoot.isEmpty) { val plan = FlinkRelOptUtil.toString(root, withChangelogTraits = true) throw new TableException( @@ -489,13 +512,15 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti * * @param rel * the node who should satisfy the requiredTrait - * @param requiredTrait + * @param requiredUpdateTrait * the required UpdateKindTrait * @return * A converted node which satisfies required traits by input nodes of current node. Or None if * required traits cannot be satisfied. */ - def visit(rel: StreamPhysicalRel, requiredTrait: UpdateKindTrait): Option[StreamPhysicalRel] = + def visit( + rel: StreamPhysicalRel, + requiredUpdateTrait: UpdateKindTrait): Option[StreamPhysicalRel] = rel match { case sink: StreamPhysicalSink => val sinkRequiredTraits = inferSinkRequiredTraits(sink) @@ -534,10 +559,10 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti _: StreamPhysicalWindowAggregate => // Aggregate, TableAggregate, Limit, GroupWindowAggregate, WindowAggregate, // and WindowTableAggregate requires update_before if there are updates - val requiredChildTrait = beforeAfterOrNone(getModifyKindSet(rel.getInput(0))) - val children = visitChildren(rel, requiredChildTrait) + val requiredChildUpdateTrait = beforeAfterOrNone(getModifyKindSet(rel.getInput(0))) + val children = visitChildren(rel, requiredChildUpdateTrait) // use requiredTrait as providedTrait, because they should support all kinds of UpdateKind - createNewNode(rel, children, requiredTrait) + createNewNode(rel, children, requiredUpdateTrait) case _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate | _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch | @@ -546,14 +571,14 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti // WindowRank, WindowDeduplicate, Deduplicate, TemporalSort, CEP, OverAggregate, // and IntervalJoin, WindowJoin require nothing about UpdateKind. val children = visitChildren(rel, UpdateKindTrait.NONE) - createNewNode(rel, children, requiredTrait) + createNewNode(rel, children, requiredUpdateTrait) case rank: StreamPhysicalRank => val rankStrategies = RankProcessStrategy.analyzeRankProcessStrategies(rank, rank.partitionKey, rank.orderKey) visitRankStrategies( rankStrategies, - requiredTrait, + requiredUpdateTrait, rankStrategy => rank.copy(rankStrategy)) case sortLimit: StreamPhysicalSortLimit => @@ -563,16 +588,16 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti sortLimit.getCollation) visitRankStrategies( rankStrategies, - requiredTrait, + requiredUpdateTrait, rankStrategy => sortLimit.copy(rankStrategy)) case sort: StreamPhysicalSort => val requiredChildTrait = beforeAfterOrNone(getModifyKindSet(sort.getInput)) val children = visitChildren(sort, requiredChildTrait) - createNewNode(sort, children, requiredTrait) + createNewNode(sort, children, requiredUpdateTrait) case join: StreamPhysicalJoin => - val onlyAfterByParent = requiredTrait.updateKind == UpdateKind.ONLY_UPDATE_AFTER + val onlyAfterByParent = requiredUpdateTrait.updateKind == UpdateKind.ONLY_UPDATE_AFTER val children = join.getInputs.zipWithIndex.map { case (child, childOrdinal) => val physicalChild = child.asInstanceOf[StreamPhysicalRel] @@ -592,7 +617,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti if (children.exists(_.isEmpty)) { None } else { - createNewNode(join, Some(children.flatten.toList), requiredTrait) + createNewNode(join, Some(children.flatten.toList), requiredUpdateTrait) } case temporalJoin: StreamPhysicalTemporalJoin => @@ -601,7 +626,8 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti // the left input required trait depends on it's parent in temporal join // the left input will send message to parent - val requiredUpdateBeforeByParent = requiredTrait.updateKind == UpdateKind.BEFORE_AND_AFTER + val requiredUpdateBeforeByParent = + requiredUpdateTrait.updateKind == UpdateKind.BEFORE_AND_AFTER val leftInputModifyKindSet = getModifyKindSet(left) val leftRequiredTrait = if (requiredUpdateBeforeByParent) { beforeAfterOrNone(leftInputModifyKindSet) @@ -630,7 +656,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti case calc: StreamPhysicalCalcBase => if ( - requiredTrait == UpdateKindTrait.ONLY_UPDATE_AFTER && + requiredUpdateTrait == UpdateKindTrait.ONLY_UPDATE_AFTER && calc.getProgram.getCondition != null ) { // we don't expect filter to satisfy ONLY_UPDATE_AFTER update kind, @@ -639,7 +665,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti None } else { // otherwise, forward UpdateKind requirement - visitChildren(rel, requiredTrait) match { + visitChildren(rel, requiredUpdateTrait) match { case None => None case Some(children) => val childTrait = children.head.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE) @@ -652,7 +678,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti _: StreamPhysicalMiniBatchAssigner | _: StreamPhysicalWatermarkAssigner | _: StreamPhysicalWindowTableFunction => // transparent forward requiredTrait to children - visitChildren(rel, requiredTrait) match { + visitChildren(rel, requiredUpdateTrait) match { case None => None case Some(children) => val childTrait = children.head.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE) @@ -666,7 +692,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val requiredChildTrait = if (childModifyKindSet.isInsertOnly) { UpdateKindTrait.NONE } else { - requiredTrait + requiredUpdateTrait } this.visit(child, requiredChildTrait) }.toList @@ -704,7 +730,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val children = visitChildren(normalize, UpdateKindTrait.ONLY_UPDATE_AFTER) // use requiredTrait as providedTrait, // because changelog normalize supports all kinds of UpdateKind - createNewNode(rel, children, requiredTrait) + createNewNode(rel, children, requiredUpdateTrait) case ts: StreamPhysicalTableSourceScan => // currently only support BEFORE_AND_AFTER if source produces updates @@ -712,11 +738,11 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val newSource = createNewNode(rel, Some(List()), providedTrait) if ( providedTrait.equals(UpdateKindTrait.BEFORE_AND_AFTER) && - requiredTrait.equals(UpdateKindTrait.ONLY_UPDATE_AFTER) + requiredUpdateTrait.equals(UpdateKindTrait.ONLY_UPDATE_AFTER) ) { // requiring only-after, but the source is CDC source, then drop update_before manually val dropUB = new StreamPhysicalDropUpdateBefore(rel.getCluster, rel.getTraitSet, rel) - createNewNode(dropUB, newSource.map(s => List(s)), requiredTrait) + createNewNode(dropUB, newSource.map(s => List(s)), requiredUpdateTrait) } else { newSource } @@ -730,9 +756,9 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti // we can't drop UPDATE_BEFORE if it is required by other parent blocks UpdateKindTrait.BEFORE_AND_AFTER } else { - requiredTrait + requiredUpdateTrait } - if (!providedTrait.satisfies(requiredTrait)) { + if (!providedTrait.satisfies(requiredUpdateTrait)) { // require ONLY_AFTER but can only provide BEFORE_AND_AFTER None } else { @@ -744,12 +770,13 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val children = process.getInputs.map { case child: StreamPhysicalRel => val childModifyKindSet = getModifyKindSet(child) - val requiredChildTrait = if (childModifyKindSet.isInsertOnly) { - UpdateKindTrait.NONE - } else { - UpdateKindTrait.BEFORE_AND_AFTER - } - this.visit(child, requiredChildTrait) + val requiredUpdateChildTrait = + if (childModifyKindSet.isInsertOnly) { + UpdateKindTrait.NONE + } else { + UpdateKindTrait.BEFORE_AND_AFTER + } + this.visit(child, requiredUpdateChildTrait) }.toList createNewNode(rel, Some(children.flatten), UpdateKindTrait.NONE) @@ -761,15 +788,15 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti private def visitChildren( parent: StreamPhysicalRel, - requiredChildrenTrait: UpdateKindTrait): Option[List[StreamPhysicalRel]] = { + requiredChildrenUpdateTrait: UpdateKindTrait): Option[List[StreamPhysicalRel]] = { val newChildren = for (child <- parent.getInputs) yield { - this.visit(child.asInstanceOf[StreamPhysicalRel], requiredChildrenTrait) match { + this.visit(child.asInstanceOf[StreamPhysicalRel], requiredChildrenUpdateTrait) match { case None => // return None if one of the children can't satisfy return None case Some(newChild) => - val providedTrait = newChild.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE) - if (!providedTrait.satisfies(requiredChildrenTrait)) { + val providedUpdateTrait = newChild.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE) + if (!providedUpdateTrait.satisfies(requiredChildrenUpdateTrait)) { // the provided trait can't satisfy required trait, thus we should return None. return None } @@ -782,13 +809,13 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti private def createNewNode( node: StreamPhysicalRel, childrenOption: Option[List[StreamPhysicalRel]], - providedTrait: UpdateKindTrait): Option[StreamPhysicalRel] = childrenOption match { + providedUpdateTrait: UpdateKindTrait): Option[StreamPhysicalRel] = childrenOption match { case None => None case Some(children) => val modifyKindSetTrait = node.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE) val nodeDescription = node.getRelDetailedDescription - val isUpdateKindValid = providedTrait.updateKind match { + val isUpdateKindValid = providedUpdateTrait.updateKind match { case UpdateKind.NONE => !modifyKindSetTrait.modifyKindSet.contains(ModifyKind.UPDATE) case UpdateKind.BEFORE_AND_AFTER | UpdateKind.ONLY_UPDATE_AFTER => @@ -796,17 +823,19 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti } if (!isUpdateKindValid) { throw new TableException( - s"UpdateKindTrait $providedTrait conflicts with " + + s"UpdateKindTrait $providedUpdateTrait conflicts with " + s"ModifyKindSetTrait $modifyKindSetTrait. " + s"This is a bug in planner, please file an issue. \n" + s"Current node is $nodeDescription.") } - val newTraitSet = node.getTraitSet.plus(providedTrait) + + val newTraitSet = node.getTraitSet.plus(providedUpdateTrait) Some(node.copy(newTraitSet, children).asInstanceOf[StreamPhysicalRel]) } /** * Try all possible rank strategies and return the first viable new node. + * * @param rankStrategies * all possible supported rank strategy by current node * @param requiredUpdateKindTrait @@ -932,10 +961,324 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti } } - // ------------------------------------------------------------------------------------------- + /** + * A visitor which will try to satisfy the required [[DeleteKindTrait]] from root. + * + *

After traversed by this visitor, every node should have a correct [[DeleteKindTrait]] or + * returns None if the planner doesn't support to satisfy the required [[DeleteKindTrait]]. + */ + private class SatisfyDeleteKindTraitVisitor(private val context: StreamOptimizeContext) { + + /** + * Try to satisfy the required [[DeleteKindTrait]] from root. + * + *

Each node will first require a DeleteKindTrait to its children. The required + * DeleteKindTrait may come from the node's parent, or come from the node itself, depending on + * whether the node will destroy the trait provided by children or pass the trait from children. + * + *

If the node will pass the children's DeleteKindTrait without destroying it, then return a + * new node with new inputs and forwarded DeleteKindTrait. + * + *

If the node will destroy the children's UpdateKindTrait, then the node itself needs to be + * converted, or a new node should be generated to satisfy the required trait, such as marking + * itself not to generate UPDATE_BEFORE, or generating a new node to filter UPDATE_BEFORE. + * + * @param rel + * the node who should satisfy the requiredTrait + * @param requiredTrait + * the required DeleteKindTrait + * @return + * A converted node which satisfies required traits by input nodes of current node. Or None if + * required traits cannot be satisfied. + */ + def visit(rel: StreamPhysicalRel, requiredTrait: DeleteKindTrait): Option[StreamPhysicalRel] = + rel match { + case sink: StreamPhysicalSink => + val sinkRequiredTraits = inferSinkRequiredTraits(sink) + visitSink(sink, sinkRequiredTraits) + + case sink: StreamPhysicalLegacySink[_] => + val childModifyKindSet = getModifyKindSet(sink.getInput) + val fullDelete = fullDeleteOrNone(childModifyKindSet) + visitSink(sink, Seq(fullDelete)) + + case _: StreamPhysicalGroupAggregate | _: StreamPhysicalGroupTableAggregate | + _: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate | + _: StreamPhysicalPythonGroupTableAggregate | _: StreamPhysicalGroupWindowAggregateBase | + _: StreamPhysicalWindowAggregate | _: StreamPhysicalSort | _: StreamPhysicalRank | + _: StreamPhysicalSortLimit | _: StreamPhysicalTemporalJoin | + _: StreamPhysicalCorrelateBase | _: StreamPhysicalLookupJoin | + _: StreamPhysicalWatermarkAssigner | _: StreamPhysicalWindowTableFunction | + _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate | + _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch | + _: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin | + _: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin | + _: StreamPhysicalProcessTableFunction => + // if not explicitly supported, all operators require full deletes if there are updates + // ProcessTableFunction currently only consumes full deletes or insert-only + val children = rel.getInputs.map { + case child: StreamPhysicalRel => + val childModifyKindSet = getModifyKindSet(child) + this.visit(child, fullDeleteOrNone(childModifyKindSet)) + }.toList + createNewNode(rel, Some(children.flatten), fullDeleteOrNone(getModifyKindSet(rel))) + + case join: StreamPhysicalJoin => + val children = join.getInputs.zipWithIndex.map { + case (child, childOrdinal) => + val physicalChild = child.asInstanceOf[StreamPhysicalRel] + val supportsDeleteByKey = join.inputUniqueKeyContainsJoinKey(childOrdinal) + val inputModifyKindSet = getModifyKindSet(physicalChild) + if (supportsDeleteByKey && requiredTrait == DELETE_BY_KEY) { + this + .visit(physicalChild, deleteOnKeyOrNone(inputModifyKindSet)) + .orElse(this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet))) + } else { + this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet)) + } + } + if (children.exists(_.isEmpty)) { + None + } else { + val childRels = children.flatten.toList + if (childRels.exists(r => getDeleteKind(r) == DeleteKind.DELETE_BY_KEY)) { + createNewNode(join, Some(childRels), deleteOnKeyOrNone(getModifyKindSet(rel))) + } else { + createNewNode(join, Some(childRels), fullDeleteOrNone(getModifyKindSet(rel))) + } + } + + case calc: StreamPhysicalCalcBase => + if ( + requiredTrait == DeleteKindTrait.DELETE_BY_KEY && + calc.getProgram.getCondition != null + ) { + // this can be further improved by checking if the filter condition is on the key + None + } else { + // otherwise, forward DeleteKind requirement + visitChildren(rel, requiredTrait) match { + case None => None + case Some(children) => + val childTrait = children.head.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE) + createNewNode(rel, Some(children), childTrait) + } + } + + case _: StreamPhysicalExchange | _: StreamPhysicalExpand | + _: StreamPhysicalMiniBatchAssigner | _: StreamPhysicalDropUpdateBefore => + // transparent forward requiredTrait to children + visitChildren(rel, requiredTrait) match { + case None => None + case Some(children) => + val childTrait = children.head.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE) + createNewNode(rel, Some(children), childTrait) + } + + case union: StreamPhysicalUnion => + val children = union.getInputs.map { + case child: StreamPhysicalRel => + val childModifyKindSet = getModifyKindSet(child) + val requiredChildTrait = if (!childModifyKindSet.contains(ModifyKind.DELETE)) { + DeleteKindTrait.NONE + } else { + requiredTrait + } + this.visit(child, requiredChildTrait) + }.toList + + if (children.exists(_.isEmpty)) { + None + } else { + val deleteKinds = children.flatten + .map(_.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE)) + // union can just forward changes, can't actively satisfy to another changelog mode + val providedTrait = if (deleteKinds.forall(k => DeleteKindTrait.NONE == k)) { + // if all the children is NONE, union is NONE + DeleteKindTrait.NONE + } else { + // otherwise, merge update kinds. + val merged = deleteKinds + .map(_.deleteKind) + .reduce { + (l, r) => + (l, r) match { + case (DeleteKind.NONE, r: DeleteKind) => r + case (l: DeleteKind, DeleteKind.NONE) => l + case (l: DeleteKind, r: DeleteKind) => + if (l == r) { + l + } else { + // if any of the union input produces DELETE_BY_KEY, the union produces + // delete by key + DeleteKind.DELETE_BY_KEY + } + } + } + new DeleteKindTrait(merged) + } + createNewNode(union, Some(children.flatten), providedTrait) + } + + case normalize: StreamPhysicalChangelogNormalize => + // if + // 1. we don't need to produce UPDATE_BEFORE, + // 2. children can satisfy the required delete trait, + // 3. the normalize doesn't have filter condition which we'd lose, + // 4. we don't use metadata columns + // we can skip ChangelogNormalize + if (!ChangelogNormalizeRequirementResolver.isRequired(normalize)) { + visitChildren(normalize, requiredTrait) match { + case Some(children) => + val input = children.head match { + case exchange: StreamPhysicalExchange => + exchange.getInput + case _ => + normalize.getInput + } + return Some(input.asInstanceOf[StreamPhysicalRel]) + case _ => + } + } + val childModifyKindTrait = getModifyKindSet(rel.getInput(0)) + + // prefer delete by key, but accept both + val children = visitChildren(normalize, deleteOnKeyOrNone(childModifyKindTrait)) + .orElse(visitChildren(normalize, fullDeleteOrNone(childModifyKindTrait))) + + // changelog normalize produces full deletes + createNewNode(rel, children, fullDeleteOrNone(getModifyKindSet(rel))) + + case ts: StreamPhysicalTableSourceScan => + // currently only support BEFORE_AND_AFTER if source produces updates + val providedTrait = DeleteKindTrait.fromChangelogMode(ts.tableSource.getChangelogMode) + createNewNode(rel, Some(List()), providedTrait) + + case _: StreamPhysicalDataStreamScan | _: StreamPhysicalLegacyTableSourceScan | + _: StreamPhysicalValues => + createNewNode(rel, Some(List()), DeleteKindTrait.NONE) + + case _: StreamPhysicalIntermediateTableScan => + createNewNode(rel, Some(List()), fullDeleteOrNone(getModifyKindSet(rel))) + + case _ => + throw new UnsupportedOperationException( + s"Unsupported visit for ${rel.getClass.getSimpleName}") + + } + + private def visitChildren( + parent: StreamPhysicalRel, + requiredChildrenTrait: DeleteKindTrait): Option[List[StreamPhysicalRel]] = { + val newChildren = for (child <- parent.getInputs) yield { + this.visit(child.asInstanceOf[StreamPhysicalRel], requiredChildrenTrait) match { + case None => + // return None if one of the children can't satisfy + return None + case Some(newChild) => + val providedTrait = newChild.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE) + if (!providedTrait.satisfies(requiredChildrenTrait)) { + // the provided trait can't satisfy required trait, thus we should return None. + return None + } + newChild + } + } + Some(newChildren.toList) + } + + private def createNewNode( + node: StreamPhysicalRel, + childrenOption: Option[List[StreamPhysicalRel]], + providedDeleteTrait: DeleteKindTrait): Option[StreamPhysicalRel] = childrenOption match { + case None => + None + case Some(children) => + val modifyKindSetTrait = node.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE) + val nodeDescription = node.getRelDetailedDescription + val isDeleteKindValid = providedDeleteTrait.deleteKind match { + case DeleteKind.NONE => + !modifyKindSetTrait.modifyKindSet.contains(ModifyKind.DELETE) + case DeleteKind.DELETE_BY_KEY | DeleteKind.FULL_DELETE => + modifyKindSetTrait.modifyKindSet.contains(ModifyKind.DELETE) + } + if (!isDeleteKindValid) { + throw new TableException( + s"DeleteKindTrait $providedDeleteTrait conflicts with " + + s"ModifyKindSetTrait $modifyKindSetTrait. " + + s"This is a bug in planner, please file an issue. \n" + + s"Current node is $nodeDescription.") + } + val newTraitSet = node.getTraitSet.plus(providedDeleteTrait) + Some(node.copy(newTraitSet, children).asInstanceOf[StreamPhysicalRel]) + } + + private def visitSink( + sink: StreamPhysicalRel, + sinkRequiredTraits: Seq[DeleteKindTrait]): Option[StreamPhysicalRel] = { + val children = sinkRequiredTraits.flatMap(t => visitChildren(sink, t)) + if (children.isEmpty) { + None + } else { + val sinkTrait = sink.getTraitSet.plus(DeleteKindTrait.NONE) + Some(sink.copy(sinkTrait, children.head).asInstanceOf[StreamPhysicalRel]) + } + } + + /** + * Infer sink required traits by the sink node and its input. Sink required traits is based on + * the sink node's changelog mode, the only exception is when sink's pk(s) not exactly the same + * as the changeLogUpsertKeys and sink' changelog mode is DELETE_BY_KEY. + */ + private def inferSinkRequiredTraits(sink: StreamPhysicalSink): Seq[DeleteKindTrait] = { + val childModifyKindSet = getModifyKindSet(sink.getInput) + val sinkChangelogMode = sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode) + + val sinkDeleteTrait = DeleteKindTrait.fromChangelogMode(sinkChangelogMode) + + val fullDelete = fullDeleteOrNone(childModifyKindSet) + if (sinkDeleteTrait.equals(DeleteKindTrait.DELETE_BY_KEY)) { + if (areUpsertKeysDifferentFromPk(sink)) { + Seq(fullDelete) + } else { + Seq(sinkDeleteTrait, fullDelete) + } + } else { + Seq(fullDelete) + } + } + + // ------------------------------------------------------------------------------------------- + + private def areUpsertKeysDifferentFromPk(sink: StreamPhysicalSink) = { + // if sink's pk(s) are not exactly match input changeLogUpsertKeys then it will fallback + // to beforeAndAfter mode for the correctness + var upsertKeyDifferentFromPk: Boolean = false + val sinkDefinedPks = sink.contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes + + if (sinkDefinedPks.nonEmpty) { + val sinkPks = ImmutableBitSet.of(sinkDefinedPks: _*) + val fmq = FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery) + val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput) + // if input is UA only, primary key != upsert key (upsert key can be null) we should + // fallback to beforeAndAfter. + // Notice: even sink pk(s) contains input upsert key we cannot optimize to UA only, + // this differs from batch job's unique key inference + if (changeLogUpsertKeys == null || !changeLogUpsertKeys.exists(_.equals(sinkPks))) { + upsertKeyDifferentFromPk = true + } + } + upsertKeyDifferentFromPk + } + } private def getModifyKindSet(node: RelNode): ModifyKindSet = { val modifyKindSetTrait = node.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE) modifyKindSetTrait.modifyKindSet } + + private def getDeleteKind(node: RelNode): DeleteKind = { + val deleteKindTrait = node.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE) + deleteKindTrait.deleteKind + } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTrait.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTrait.scala new file mode 100644 index 0000000000000..b23063bd35955 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTrait.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.planner.plan.`trait` + +import org.apache.flink.table.connector.ChangelogMode +import org.apache.flink.types.RowKind + +import org.apache.calcite.plan.{RelOptPlanner, RelTrait, RelTraitDef} + +/** DeleteKindTrait is used to describe the kind of delete operation. */ +class DeleteKindTrait(val deleteKind: DeleteKind) extends RelTrait { + + override def satisfies(relTrait: RelTrait): Boolean = relTrait match { + case other: DeleteKindTrait => + // should totally match + other.deleteKind == this.deleteKind + case _ => false + } + + override def getTraitDef: RelTraitDef[_ <: RelTrait] = DeleteKindTraitDef.INSTANCE + + override def register(planner: RelOptPlanner): Unit = {} + + override def hashCode(): Int = deleteKind.hashCode() + + override def equals(obj: Any): Boolean = obj match { + case t: DeleteKindTrait => this.deleteKind.equals(t.deleteKind) + case _ => false + } + + override def toString: String = s"[${deleteKind.toString}]" +} + +object DeleteKindTrait { + + /** An [[DeleteKindTrait]] that indicates the node does not support delete operation. */ + val NONE = new DeleteKindTrait(DeleteKind.NONE) + + /** An [[DeleteKindTrait]] that indicates the node supports deletes by key only. */ + val DELETE_BY_KEY = new DeleteKindTrait(DeleteKind.DELETE_BY_KEY) + + /** An [[DeleteKindTrait]] that indicates the node produces requires deletes by full records. */ + val FULL_DELETE = new DeleteKindTrait(DeleteKind.FULL_DELETE) + + /** + * Returns DELETE_BY_KEY [[DeleteKindTrait]] if there are delete changes. Otherwise, returns NONE + * [[DeleteKindTrait]]. + */ + def deleteOnKeyOrNone(modifyKindSet: ModifyKindSet): DeleteKindTrait = { + val deleteKind = if (modifyKindSet.contains(ModifyKind.DELETE)) { + DeleteKind.DELETE_BY_KEY + } else { + DeleteKind.NONE + } + new DeleteKindTrait(deleteKind) + } + + /** + * Returns FULL_DELETE [[DeleteKindTrait]] if there are delete changes. Otherwise, returns NONE + * [[DeleteKindTrait]]. + */ + def fullDeleteOrNone(modifyKindSet: ModifyKindSet): DeleteKindTrait = { + val deleteKind = if (modifyKindSet.contains(ModifyKind.DELETE)) { + DeleteKind.FULL_DELETE + } else { + DeleteKind.NONE + } + new DeleteKindTrait(deleteKind) + } + + /** Creates an instance of [[DeleteKindTrait]] from the given [[ChangelogMode]]. */ + def fromChangelogMode(changelogMode: ChangelogMode): DeleteKindTrait = { + val hasDelete = changelogMode.contains(RowKind.DELETE) + if (!hasDelete) { + DeleteKindTrait.NONE + } else { + val hasDeleteOnKey = changelogMode.keyOnlyDeletes() + if (hasDeleteOnKey) { + DeleteKindTrait.DELETE_BY_KEY + } else { + DeleteKindTrait.FULL_DELETE + } + } + } +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTraitDef.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTraitDef.scala new file mode 100644 index 0000000000000..5d374c48bf12b --- /dev/null +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTraitDef.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.planner.plan.`trait` + +import org.apache.calcite.plan.{RelOptPlanner, RelTraitDef} +import org.apache.calcite.rel.RelNode + +/** Definition of a trait containing [[DeleteKindTrait]]. */ +class DeleteKindTraitDef extends RelTraitDef[DeleteKindTrait] { + + override def getTraitClass: Class[DeleteKindTrait] = classOf[DeleteKindTrait] + + override def getSimpleName: String = this.getClass.getSimpleName + + override def convert( + planner: RelOptPlanner, + rel: RelNode, + toTrait: DeleteKindTrait, + allowInfiniteCostConverters: Boolean): RelNode = { + rel.copy(rel.getTraitSet.plus(toTrait), rel.getInputs) + } + + override def canConvert( + planner: RelOptPlanner, + fromTrait: DeleteKindTrait, + toTrait: DeleteKindTrait): Boolean = { + throw new UnsupportedOperationException("DeleteKindTrait conversion is not supported for now.") + } + + override def getDefault: DeleteKindTrait = DeleteKindTrait.FULL_DELETE +} + +object DeleteKindTraitDef { + val INSTANCE = new DeleteKindTraitDef() +} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ChangelogPlanUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ChangelogPlanUtils.scala index 72eb3ecfc4d24..ae90a5d6d7f8e 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ChangelogPlanUtils.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ChangelogPlanUtils.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.planner.plan.utils import org.apache.flink.table.connector.ChangelogMode -import org.apache.flink.table.planner.plan.`trait`.{ModifyKind, ModifyKindSetTraitDef, UpdateKind, UpdateKindTraitDef} +import org.apache.flink.table.planner.plan.`trait`.{DeleteKind, DeleteKindTraitDef, ModifyKind, ModifyKindSetTraitDef, UpdateKind, UpdateKindTraitDef} import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel import org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram import org.apache.flink.types.RowKind @@ -85,6 +85,11 @@ object ChangelogPlanUtils { val updateKind = node.getTraitSet .getTrait(UpdateKindTraitDef.INSTANCE) .updateKind + val deleteKind = Option( + node.getTraitSet + .getTrait(DeleteKindTraitDef.INSTANCE)) + .map(_.deleteKind) + .getOrElse(DeleteKind.NONE) if (modifyKindSet.isEmpty) { None @@ -102,6 +107,11 @@ object ChangelogPlanUtils { modeBuilder.addContainedKind(RowKind.UPDATE_BEFORE) } } + + if (deleteKind == DeleteKind.DELETE_BY_KEY) { + modeBuilder.keyOnlyDeletes(true) + } + Some(modeBuilder.build()) } } @@ -121,7 +131,11 @@ object ChangelogPlanUtils { kinds += "UA" } if (mode.contains(RowKind.DELETE)) { - kinds += "D" + if (mode.keyOnlyDeletes()) { + kinds += "PD" + } else { + kinds += "D" + } } kinds.mkString(",") } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java index 659e6b11209fb..f75cb778597f5 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java @@ -457,6 +457,22 @@ private static RowKind parseRowKind(String rowKindShortString) { .withDescription( "Option to determine whether or not to require the distribution bucket count"); + private static final ConfigOption SINK_SUPPORTS_DELETE_BY_KEY = + ConfigOptions.key("sink.supports-delete-by-key") + .booleanType() + .defaultValue(false) + .withDescription( + "Option to determine whether or not to require deletes to have the" + + " entire row or is a delete by key sufficient."); + + private static final ConfigOption SOURCE_PRODUCES_DELETE_BY_KEY = + ConfigOptions.key("source.produces-delete-by-key") + .booleanType() + .defaultValue(false) + .withDescription( + "Option to determine whether or not to require deletes to have the" + + " entire row or is a delete by key sufficient."); + private static final ConfigOption SOURCE_NUM_ELEMENT_TO_SKIP = ConfigOptions.key("source.num-element-to-skip") .intType() @@ -504,7 +520,10 @@ public DynamicTableSource createDynamicTableSource(Context context) { helper.validate(); - ChangelogMode changelogMode = parseChangelogMode(helper.getOptions().get(CHANGELOG_MODE)); + ChangelogMode changelogMode = + parseChangelogMode( + helper.getOptions().get(CHANGELOG_MODE), + helper.getOptions().get(SOURCE_PRODUCES_DELETE_BY_KEY)); String runtimeSource = helper.getOptions().get(RUNTIME_SOURCE); boolean isBounded = helper.getOptions().get(BOUNDED); boolean isFinite = helper.getOptions().get(TERMINATING); @@ -749,6 +768,7 @@ public DynamicTableSink createDynamicTableSink(Context context) { TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); boolean requireBucketCount = helper.getOptions().get(SINK_BUCKET_COUNT_REQUIRED); + boolean supportsDeleteByKey = helper.getOptions().get(SINK_SUPPORTS_DELETE_BY_KEY); if (sinkClass.equals("DEFAULT")) { int rowTimeIndex = validateAndExtractRowtimeIndex( @@ -765,7 +785,8 @@ public DynamicTableSink createDynamicTableSink(Context context) { changelogMode, rowTimeIndex, tableSchema, - requireBucketCount); + requireBucketCount, + supportsDeleteByKey); } else { try { return InstantiationUtil.instantiate( @@ -816,6 +837,8 @@ public Set> optionalOptions() { ENABLE_WATERMARK_PUSH_DOWN, SINK_DROP_LATE_EVENT, SINK_BUCKET_COUNT_REQUIRED, + SINK_SUPPORTS_DELETE_BY_KEY, + SOURCE_PRODUCES_DELETE_BY_KEY, SOURCE_NUM_ELEMENT_TO_SKIP, SOURCE_SLEEP_AFTER_ELEMENTS, SOURCE_SLEEP_TIME, @@ -916,6 +939,10 @@ private static Map, Collection> mapPartitionToRow( } private ChangelogMode parseChangelogMode(String string) { + return parseChangelogMode(string, false); + } + + private ChangelogMode parseChangelogMode(String string, boolean producesDeleteByKey) { ChangelogMode.Builder builder = ChangelogMode.newBuilder(); for (String split : string.split(",")) { switch (split.trim()) { @@ -935,6 +962,7 @@ private ChangelogMode parseChangelogMode(String string) { throw new IllegalArgumentException("Invalid ChangelogMode string: " + string); } } + builder.keyOnlyDeletes(producesDeleteByKey); return builder.build(); } @@ -1621,7 +1649,7 @@ private static class TestValuesScanTableSourceWithWatermarkPushDown implements SupportsWatermarkPushDown, SupportsSourceWatermark { private final String tableName; - private WatermarkStrategy watermarkStrategy; + private WatermarkStrategy watermarkStrategy = WatermarkStrategy.noWatermarks(); private TestValuesScanTableSourceWithWatermarkPushDown( DataType producedDataType, @@ -2207,6 +2235,7 @@ private static class TestValuesTableSink private final int rowtimeIndex; private final TableSchema tableSchema; private final boolean requireBucketCount; + private final boolean supportsDeleteByKey; private TestValuesTableSink( DataType consumedDataType, @@ -2220,7 +2249,8 @@ private TestValuesTableSink( @Nullable ChangelogMode changelogModeEnforced, int rowtimeIndex, TableSchema tableSchema, - boolean requireBucketCount) { + boolean requireBucketCount, + boolean supportsDeleteByKey) { this.consumedDataType = consumedDataType; this.primaryKeyIndices = primaryKeyIndices; this.tableName = tableName; @@ -2233,10 +2263,19 @@ private TestValuesTableSink( this.rowtimeIndex = rowtimeIndex; this.tableSchema = tableSchema; this.requireBucketCount = requireBucketCount; + this.supportsDeleteByKey = supportsDeleteByKey; } @Override public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + final ChangelogMode mode = getMode(requestedMode); + final ChangelogMode.Builder builder = ChangelogMode.newBuilder(); + mode.getContainedKinds().forEach(builder::addContainedKind); + builder.keyOnlyDeletes(supportsDeleteByKey); + return builder.build(); + } + + private ChangelogMode getMode(ChangelogMode requestedMode) { // if param [changelogModeEnforced] is passed in, return it directly if (changelogModeEnforced != null) { return changelogModeEnforced; @@ -2376,7 +2415,8 @@ public DynamicTableSink copy() { changelogModeEnforced, rowtimeIndex, tableSchema, - requireBucketCount); + requireBucketCount, + supportsDeleteByKey); } @Override diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeyPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeyPrograms.java new file mode 100644 index 0000000000000..6599f64995055 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeyPrograms.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +/** + * Tests for verifying semantic of operations when sources produce deletes by key only and the sink + * can accept deletes by key only as well. + */ +public final class DeletesByKeyPrograms { + + /** + * Tests a simple INSERT INTO SELECT scenario where ChangelogNormalize can be eliminated since + * we don't need UPDATE_BEFORE, and we have key information for all changes. + */ + public static final TableTestProgram INSERT_SELECT_DELETE_BY_KEY_DELETE_BY_KEY = + TableTestProgram.of( + "select-delete-on-key-to-delete-on-key", + "No ChangelogNormalize: validates results when querying source with deletes by key" + + " only, writing to sink supporting deletes by key only, which" + + " is a case where ChangelogNormalize can be eliminated") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema( + "id INT PRIMARY KEY NOT ENFORCED", + "name STRING", + "`value` INT") + .addOption("changelog-mode", "I,UA,D") + .addOption("source.produces-delete-by-key", "true") + .producedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice", 10), + Row.ofKind(RowKind.INSERT, 2, "Bob", 20), + // Delete by key + Row.ofKind(RowKind.DELETE, 1, null, null), + // Update after only + Row.ofKind(RowKind.UPDATE_AFTER, 2, "Bob", 30)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "id INT PRIMARY KEY NOT ENFORCED", + "name STRING", + "`value` INT") + .addOption( + "changelog-mode", + "I,UA,D") // Insert, UpdateAfter, Delete + .addOption("sink.supports-delete-by-key", "true") + .consumedValues( + "+I[1, Alice, 10]", + "+I[2, Bob, 20]", + "-D[1, null, null]", + "+U[2, Bob, 30]") + .build()) + .runSql("INSERT INTO sink_t SELECT id, name, `value` FROM source_t") + .build(); + + public static final TableTestProgram INSERT_SELECT_DELETE_BY_KEY_DELETE_BY_KEY_WITH_PROJECTION = + TableTestProgram.of( + "select-delete-on-key-to-delete-on-key-with-projection", + "No ChangelogNormalize: validates results when querying source with deletes by key" + + " only, writing to sink supporting deletes by key only with a" + + "projection, which is a case where ChangelogNormalize can be" + + " eliminated") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema( + "id INT PRIMARY KEY NOT ENFORCED", + "name STRING NOT NULL", + "`value` INT NOT NULL") + .addOption("changelog-mode", "I,UA,D") + .addOption("source.produces-delete-by-key", "true") + .producedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice", 10), + Row.ofKind(RowKind.INSERT, 2, "Bob", 20), + // Delete by key + Row.ofKind(RowKind.DELETE, 1, null, null), + // Update after only + Row.ofKind(RowKind.UPDATE_AFTER, 2, "Bob", 30)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "id INT PRIMARY KEY NOT ENFORCED", + "name STRING", + "`value` INT") + .addOption( + "changelog-mode", + "I,UA,D") // Insert, UpdateAfter, Delete + .addOption("sink.supports-delete-by-key", "true") + .consumedValues( + "+I[1, Alice, 12]", + "+I[2, Bob, 22]", + "-D[1, , -1]", + "+U[2, Bob, 32]") + .build()) + .runSql("INSERT INTO sink_t SELECT id, name, `value` + 2 FROM source_t") + .build(); + + public static final TableTestProgram INSERT_SELECT_DELETE_BY_KEY_FULL_DELETE = + TableTestProgram.of( + "select-delete-on-key-to-full-delete", + "ChangelogNormalize: validates results when querying source with deletes by key" + + " only, writing to sink supporting requiring full deletes, " + + "which is a case where ChangelogNormalize stays") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema( + "id INT PRIMARY KEY NOT ENFORCED", + "name STRING", + "`value` INT") + .addOption("changelog-mode", "I,UA,D") + .addOption("source.produces-delete-by-key", "true") + .producedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice", 10), + Row.ofKind(RowKind.INSERT, 2, "Bob", 20), + // Delete by key + Row.ofKind(RowKind.DELETE, 1, null, null), + // Update after only + Row.ofKind(RowKind.UPDATE_AFTER, 2, "Bob", 30)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "id INT PRIMARY KEY NOT ENFORCED", + "name STRING", + "`value` INT") + .addOption("changelog-mode", "I,UA,D") + .addOption("sink.supports-delete-by-key", "false") + .consumedValues( + "+I[1, Alice, 10]", + "+I[2, Bob, 20]", + "-D[1, Alice, 10]", + "+U[2, Bob, 30]") + .build()) + .runSql("INSERT INTO sink_t SELECT id, name, `value` FROM source_t") + .build(); + + public static final TableTestProgram INSERT_SELECT_FULL_DELETE_FULL_DELETE = + TableTestProgram.of( + "select-full-delete-to-full-delete", + "No ChangelogNormalize: validates results when querying source with full deletes, " + + "writing to sink requiring full deletes, which is a case" + + " where ChangelogNormalize can be eliminated") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema( + "id INT PRIMARY KEY NOT ENFORCED", + "name STRING", + "`value` INT") + .addOption("changelog-mode", "I,UA,D") + .addOption("source.produces-delete-by-key", "false") + .producedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice", 10), + Row.ofKind(RowKind.INSERT, 2, "Bob", 20), + // Delete by key + Row.ofKind(RowKind.DELETE, 1, "Alice", 10), + // Update after only + Row.ofKind(RowKind.UPDATE_AFTER, 2, "Bob", 30)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "id INT PRIMARY KEY NOT ENFORCED", + "name STRING", + "`value` INT") + .addOption("changelog-mode", "I,UA,D") + .addOption("sink.supports-delete-by-key", "false") + .consumedValues( + "+I[1, Alice, 10]", + "+I[2, Bob, 20]", + "-D[1, Alice, 10]", + "+U[2, Bob, 30]") + .build()) + .runSql("INSERT INTO sink_t SELECT id, name, `value` FROM source_t") + .build(); + + public static final TableTestProgram JOIN_INTO_FULL_DELETES = + TableTestProgram.of( + "join-to-full-delete", + "ChangelogNormalize: validates results when joining sources with deletes by key" + + " only, writing to sink requiring full deletes, which" + + " is a case where ChangelogNormalize stays") + .setupTableSource( + SourceTestStep.newBuilder("left_t") + .addSchema("id INT PRIMARY KEY NOT ENFORCED", "`value` INT") + .addOption("changelog-mode", "I,UA,D") + .addOption("source.produces-delete-by-key", "true") + .producedValues( + Row.ofKind(RowKind.INSERT, 1, 10), + Row.ofKind(RowKind.INSERT, 2, 20), + Row.ofKind(RowKind.INSERT, 3, 30), + // Delete by key + Row.ofKind(RowKind.DELETE, 1, null), + // Update after only + Row.ofKind(RowKind.UPDATE_AFTER, 3, 40)) + .build()) + .setupTableSource( + SourceTestStep.newBuilder("right_t") + .addSchema("id INT PRIMARY KEY NOT ENFORCED", "name STRING") + .addOption("changelog-mode", "I,UA,D") + .addOption("source.produces-delete-by-key", "true") + .producedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.INSERT, 3, "Emily"), + // Delete by key + Row.ofKind(RowKind.DELETE, 1, null), + // Update after only + Row.ofKind(RowKind.UPDATE_AFTER, 2, "BOB")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "id INT PRIMARY KEY NOT ENFORCED", + "name STRING", + "`value` INT") + .addOption("changelog-mode", "I,UA,D") + .addOption("sink.supports-delete-by-key", "false") + .consumedValues( + "+I[1, Alice, 10]", + "+I[2, Bob, 20]", + "+I[3, Emily, 30]", + "-D[1, Alice, 10]", + "+U[3, Emily, 40]", + "+U[2, BOB, 20]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT l.id, r.name, l.`value` FROM left_t l JOIN right_t r ON l.id = r.id") + .build(); + + public static final TableTestProgram JOIN_INTO_DELETES_BY_KEY = + TableTestProgram.of( + "join-to-delete-on-key", + "No ChangelogNormalize: validates results when joining sources with deletes by key" + + " only, writing to sink supporting deletes by key, which" + + " is a case where ChangelogNormalize can be removed") + .setupTableSource( + SourceTestStep.newBuilder("left_t") + .addSchema("id INT PRIMARY KEY NOT ENFORCED", "`value` INT") + .addOption("changelog-mode", "I,UA,D") + .addOption("source.produces-delete-by-key", "true") + .producedValues( + Row.ofKind(RowKind.INSERT, 1, 10), + Row.ofKind(RowKind.INSERT, 2, 20), + Row.ofKind(RowKind.INSERT, 3, 30), + // Delete by key + Row.ofKind(RowKind.DELETE, 1, null), + // Update after only + Row.ofKind(RowKind.UPDATE_AFTER, 3, 40)) + .build()) + .setupTableSource( + SourceTestStep.newBuilder("right_t") + .addSchema("id INT PRIMARY KEY NOT ENFORCED", "name STRING") + .addOption("changelog-mode", "I,UA,D") + .addOption("source.produces-delete-by-key", "true") + .producedValues( + Row.ofKind(RowKind.INSERT, 1, "Alice"), + Row.ofKind(RowKind.INSERT, 2, "Bob"), + Row.ofKind(RowKind.INSERT, 3, "Emily"), + // Delete by key + Row.ofKind(RowKind.DELETE, 1, null), + // Update after only + Row.ofKind(RowKind.UPDATE_AFTER, 2, "BOB")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "id INT PRIMARY KEY NOT ENFORCED", + "name STRING", + "`value` INT") + .addOption("changelog-mode", "I,UA,D") + .addOption("sink.supports-delete-by-key", "true") + .consumedValues( + "+I[1, Alice, 10]", + "+I[2, Bob, 20]", + "+I[3, Emily, 30]", + "-D[1, Alice, null]", + "+U[3, Emily, 40]", + "+U[2, BOB, 20]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT l.id, r.name, l.`value` FROM left_t l JOIN right_t r ON l.id = r.id") + .build(); + + private DeletesByKeyPrograms() {} +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeySemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeySemanticTests.java new file mode 100644 index 0000000000000..eba93c8bba9ce --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeySemanticTests.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.List; + +/** Semantic tests for various {@link StreamExecNode}s and sources producing deletes by key only. */ +public class DeletesByKeySemanticTests extends SemanticTestBase { + + @Override + public List programs() { + return List.of( + DeletesByKeyPrograms.INSERT_SELECT_DELETE_BY_KEY_DELETE_BY_KEY, + DeletesByKeyPrograms.INSERT_SELECT_DELETE_BY_KEY_FULL_DELETE, + DeletesByKeyPrograms.INSERT_SELECT_FULL_DELETE_FULL_DELETE, + DeletesByKeyPrograms.INSERT_SELECT_DELETE_BY_KEY_DELETE_BY_KEY_WITH_PROJECTION, + DeletesByKeyPrograms.JOIN_INTO_FULL_DELETES, + DeletesByKeyPrograms.JOIN_INTO_DELETES_BY_KEY); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.java new file mode 100644 index 0000000000000..79f6da967e1d0 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.stream.sql; + +import org.apache.flink.table.api.ExplainDetail; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.planner.utils.JavaStreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; +import org.apache.flink.util.StringUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** Plan tests for removal of redundant changelog normalize. */ +public class ChangelogNormalizeOptimizationTest extends TableTestBase { + + private final JavaStreamTableTestUtil util = javaStreamTestUtil(); + + static List getTests() { + return Arrays.asList( + TestSpec.select(SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, SinkTable.UPSERT_SINK), + TestSpec.select(SourceTable.UPSERT_SOURCE_FULL_DELETES, SinkTable.UPSERT_SINK), + TestSpec.select( + SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, + SinkTable.UPSERT_SINK_FULL_DELETES), + TestSpec.select( + SourceTable.UPSERT_SOURCE_FULL_DELETES, SinkTable.UPSERT_SINK_FULL_DELETES), + TestSpec.select(SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, SinkTable.RETRACT_SINK), + TestSpec.select(SourceTable.UPSERT_SOURCE_FULL_DELETES, SinkTable.RETRACT_SINK), + TestSpec.selectWithFilter( + SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, SinkTable.UPSERT_SINK), + TestSpec.selectWithFilter( + SourceTable.UPSERT_SOURCE_FULL_DELETES, SinkTable.UPSERT_SINK), + TestSpec.selectWithFilter( + SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, SinkTable.RETRACT_SINK), + TestSpec.selectWithFilter( + SourceTable.UPSERT_SOURCE_FULL_DELETES, SinkTable.RETRACT_SINK), + TestSpec.join( + SourceTable.UPSERT_SOURCE_FULL_DELETES, + SourceTable.UPSERT_SOURCE_FULL_DELETES, + SinkTable.UPSERT_SINK), + TestSpec.join( + SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, + SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, + SinkTable.UPSERT_SINK), + TestSpec.join( + SourceTable.UPSERT_SOURCE_FULL_DELETES, + SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, + SinkTable.UPSERT_SINK), + TestSpec.join( + SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, + SourceTable.UPSERT_SOURCE_FULL_DELETES, + SinkTable.UPSERT_SINK), + TestSpec.join( + SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, + SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, + SinkTable.UPSERT_SINK_FULL_DELETES), + TestSpec.join( + SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, + SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, + SinkTable.RETRACT_SINK), + TestSpec.select( + SourceTable.UPSERT_SOURCE_PARTIAL_DELETES_METADATA, + SinkTable.UPSERT_SINK_METADATA), + TestSpec.selectWithoutMetadata( + SourceTable.UPSERT_SOURCE_PARTIAL_DELETES_METADATA, SinkTable.UPSERT_SINK)); + } + + @AfterEach + void tearDown() { + Arrays.stream(util.tableEnv().listTables()) + .forEach(t -> util.tableEnv().executeSql("DROP TABLE " + t)); + } + + @ParameterizedTest() + @MethodSource("getTests") + void testChangelogNormalizePlan(TestSpec spec) { + for (TableProperties tableProperties : spec.tablesToCreate) { + final String additionalColumns = + String.join(",\n", tableProperties.getAdditionalColumns()); + util.tableEnv() + .executeSql( + String.format( + "CREATE TABLE %s ( id INT,\n" + + " col1 INT,\n" + + " col2 STRING,\n" + + "%s" + + " PRIMARY KEY(id) NOT ENFORCED) WITH (%s)", + tableProperties.getTableName(), + StringUtils.isNullOrWhitespaceOnly(additionalColumns) + ? "" + : additionalColumns + ",\n", + String.join(",\n", tableProperties.getOptions()))); + } + util.verifyRelPlanInsert( + spec.query, + JavaScalaConversionUtil.toScala( + Collections.singletonList(ExplainDetail.CHANGELOG_MODE))); + } + + interface TableProperties { + + String getTableName(); + + List getOptions(); + + List getAdditionalColumns(); + } + + public enum SourceTable implements TableProperties { + UPSERT_SOURCE_PARTIAL_DELETES( + "upsert_table_partial_deletes", + "'connector' = 'values'", + "'changelog-mode' = 'UA,D'", + "'source.produces-delete-by-key'='true'"), + UPSERT_SOURCE_PARTIAL_DELETES_METADATA( + "upsert_table_partial_deletes_metadata", + List.of("`offset` BIGINT METADATA"), + "'connector' = 'values'", + "'changelog-mode' = 'UA,D'", + "'source.produces-delete-by-key'='true'", + "'readable-metadata' = 'offset:BIGINT'"), + UPSERT_SOURCE_FULL_DELETES( + "upsert_table_full_deletes", + "'connector' = 'values'", + "'changelog-mode' = 'UA,D'", + "'source.produces-delete-by-key'='false'"); + + private final String tableName; + private final List options; + private final List additionalColumns; + + SourceTable(String tableName, String... options) { + this(tableName, Collections.emptyList(), options); + } + + SourceTable(String tableName, List additionalColumns, String... options) { + this.tableName = tableName; + this.additionalColumns = additionalColumns; + this.options = Arrays.asList(options); + } + + @Override + public String getTableName() { + return tableName; + } + + @Override + public List getOptions() { + return options; + } + + @Override + public List getAdditionalColumns() { + return additionalColumns; + } + } + + public enum SinkTable implements TableProperties { + UPSERT_SINK( + "upsert_sink_table", + " 'connector' = 'values'", + "'sink.supports-delete-by-key' = 'true'", + "'sink-changelog-mode-enforced' = 'I,UA,D'"), + UPSERT_SINK_METADATA( + "upsert_sink_table", + List.of("`offset` BIGINT METADATA"), + " 'connector' = 'values'", + "'sink.supports-delete-by-key' = 'true'", + "'writable-metadata' = 'offset:BIGINT'", + "'sink-changelog-mode-enforced' = 'I,UA,D'"), + UPSERT_SINK_FULL_DELETES( + "upsert_sink_table_full_deletes", + " 'connector' = 'values'", + "'sink.supports-delete-by-key' = 'false'", + "'sink-changelog-mode-enforced' = 'I,UA,D'"), + RETRACT_SINK( + "all_change_sink_table", + "'connector' = 'values'", + "'sink-changelog-mode-enforced' = 'I,UA,UB,D'"); + + private final String tableName; + private final List options; + private final List additionalColumns; + + SinkTable(String tableName, String... options) { + this(tableName, Collections.emptyList(), options); + } + + SinkTable(String tableName, List additionalColumns, String... options) { + this.tableName = tableName; + this.options = Arrays.asList(options); + this.additionalColumns = additionalColumns; + } + + @Override + public String getTableName() { + return tableName; + } + + @Override + public List getOptions() { + return options; + } + + @Override + public List getAdditionalColumns() { + return additionalColumns; + } + } + + private static class TestSpec { + + private final Set tablesToCreate; + private final String query; + private final String description; + + private TestSpec(String description, Set tablesToCreate, String query) { + this.tablesToCreate = tablesToCreate; + this.query = query; + this.description = description; + } + + public static TestSpec selectWithoutMetadata(SourceTable sourceTable, SinkTable sinkTable) { + return new TestSpec( + String.format( + "select_no_metadata_%s_into_%s", + sourceTable.getTableName(), sinkTable.getTableName()), + new HashSet<>(Arrays.asList(sourceTable, sinkTable)), + String.format( + "INSERT INTO %s SELECT id, col1, col2 FROM %s", + sinkTable.getTableName(), sourceTable.getTableName())); + } + + public static TestSpec select(SourceTable sourceTable, SinkTable sinkTable) { + return new TestSpec( + String.format( + "select_%s_into_%s", + sourceTable.getTableName(), sinkTable.getTableName()), + new HashSet<>(Arrays.asList(sourceTable, sinkTable)), + String.format( + "INSERT INTO %s SELECT * FROM %s", + sinkTable.getTableName(), sourceTable.getTableName())); + } + + public static TestSpec selectWithFilter(SourceTable sourceTable, SinkTable sinkTable) { + return new TestSpec( + String.format( + "select_with_filter_%s_into_%s", + sourceTable.getTableName(), sinkTable.getTableName()), + new HashSet<>(Arrays.asList(sourceTable, sinkTable)), + String.format( + "INSERT INTO %s SELECT * FROM %s WHERE col1 > 2", + sinkTable.getTableName(), sourceTable.getTableName())); + } + + public static TestSpec join( + SourceTable leftTable, SourceTable rightTable, SinkTable sinkTable) { + return new TestSpec( + String.format( + "join_%s_%s_into_%s", + leftTable.getTableName(), + rightTable.getTableName(), + sinkTable.getTableName()), + new HashSet<>(Arrays.asList(leftTable, rightTable, sinkTable)), + String.format( + "INSERT INTO %s SELECT l.* FROM %s l JOIN %s r ON l.id = r.id", + sinkTable.getTableName(), + leftTable.getTableName(), + rightTable.getTableName())); + } + + @Override + public String toString() { + return description; + } + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java index c0c769a698aec..f876e815b5150 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java @@ -577,6 +577,7 @@ void testFromAndToChangelogStreamUpsert() throws Exception { tableEnv.fromChangelogStream( changelogStream, Schema.newBuilder().primaryKey("f0").build(), + // produce partial deletes ChangelogMode.upsert())); final Table result = tableEnv.sqlQuery("SELECT f0, SUM(f1) FROM t GROUP BY f0"); @@ -585,7 +586,8 @@ void testFromAndToChangelogStreamUpsert() throws Exception { tableEnv.toChangelogStream( result, Schema.newBuilder().primaryKey("f0").build(), - ChangelogMode.upsert()), + // expect full deletes, therefore, require changelog normalize + ChangelogMode.upsert(false)), getOutput(inputOrOutput)); } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.xml new file mode 100644 index 0000000000000..c15ac5f8d621a --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.xml @@ -0,0 +1,418 @@ + + + + + + 2]]> + + + ($1, 2)]) + +- LogicalTableScan(table=[[default_catalog, default_database, upsert_table_full_deletes]]) +]]> + + + (col1, 2)], changelogMode=[I,UB,UA,D]) + +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) + +- TableSourceScan(table=[[default_catalog, default_database, upsert_table_full_deletes, filter=[]]], fields=[id, col1, col2], changelogMode=[UA,D]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 2]]> + + + ($1, 2)]) + +- LogicalTableScan(table=[[default_catalog, default_database, upsert_table_partial_deletes]]) +]]> + + + (col1, 2)], changelogMode=[I,UB,UA,D]) + +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD]) + +- TableSourceScan(table=[[default_catalog, default_database, upsert_table_partial_deletes, filter=[]]], fields=[id, col1, col2], changelogMode=[UA,PD]) +]]> + + + + + 2]]> + + + ($1, 2)]) + +- LogicalTableScan(table=[[default_catalog, default_database, upsert_table_full_deletes]]) +]]> + + + (col1, 2)], changelogMode=[I,UB,UA,D]) + +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) + +- TableSourceScan(table=[[default_catalog, default_database, upsert_table_full_deletes, filter=[]]], fields=[id, col1, col2], changelogMode=[UA,D]) +]]> + + + + + 2]]> + + + ($1, 2)]) + +- LogicalTableScan(table=[[default_catalog, default_database, upsert_table_partial_deletes]]) +]]> + + + (col1, 2)], changelogMode=[I,UB,UA,D]) + +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD]) + +- TableSourceScan(table=[[default_catalog, default_database, upsert_table_partial_deletes, filter=[]]], fields=[id, col1, col2], changelogMode=[UA,PD]) +]]> + + + diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml index 04775282ec162..52db3da571ce3 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml @@ -333,7 +333,7 @@ Calc(select=[currency, amount, rate, *(amount, rate) AS EXPR$3], changelogMode=[ : +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[amount, currency, rowtime], changelogMode=[I]) +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], changelogMode=[UA,D]) - +- TableSourceScan(table=[[default_catalog, default_database, rates_history]], fields=[currency, rate, rowtime], changelogMode=[UA,D]) + +- TableSourceScan(table=[[default_catalog, default_database, rates_history]], fields=[currency, rate, rowtime], changelogMode=[UA,PD]) ]]> @@ -486,8 +486,8 @@ Calc(select=[currency, amount, rate, *(amount, rate) AS EXPR$3], changelogMode=[ : +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[amount, currency], changelogMode=[I]) +- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D]) +- ChangelogNormalize(key=[currency], changelogMode=[I,UA,D]) - +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D]) - +- TableSourceScan(table=[[default_catalog, default_database, rates_history]], fields=[currency, rate], changelogMode=[UA,D]) + +- Exchange(distribution=[hash[currency]], changelogMode=[UA,PD]) + +- TableSourceScan(table=[[default_catalog, default_database, rates_history]], fields=[currency, rate], changelogMode=[UA,PD]) ]]> @@ -658,8 +658,8 @@ LogicalProject(b=[$2], ts=[$0], a=[$1]) Union(all=[true], union=[b, ts, a], changelogMode=[I,UA,D]) :- Calc(select=[b, ts, CAST(a AS INTEGER) AS a], changelogMode=[I,UA,D]) : +- ChangelogNormalize(key=[a], changelogMode=[I,UA,D]) -: +- Exchange(distribution=[hash[a]], changelogMode=[UA,D]) -: +- TableSourceScan(table=[[default_catalog, default_database, upsert_src]], fields=[ts, a, b], changelogMode=[UA,D]) +: +- Exchange(distribution=[hash[a]], changelogMode=[UA,PD]) +: +- TableSourceScan(table=[[default_catalog, default_database, upsert_src]], fields=[ts, a, b], changelogMode=[UA,PD]) +- Calc(select=[b, t AS ts, a], changelogMode=[I,UA]) +- GroupAggregate(groupBy=[a], select=[a, MAX(ts) AS t, MAX(b) AS b], changelogMode=[I,UA]) +- Exchange(distribution=[hash[a]], changelogMode=[I]) @@ -744,8 +744,8 @@ LogicalProject(id=[$0], ts=[$4]) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala index 3dc7bb8a66666..2ba440b7fd1d2 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala @@ -95,7 +95,8 @@ class NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUp | primary key (a) not enforced |) with ( | 'connector' = 'values', - | 'changelog-mode' = 'I,UA,D' + | 'changelog-mode' = 'I,UA,D', + | 'source.produces-delete-by-key' = 'true' |)""".stripMargin) util.tableEnv.executeSql(""" |create temporary table upsert_src_with_meta ( @@ -109,6 +110,7 @@ class NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUp |) with ( | 'connector' = 'values', | 'changelog-mode' = 'I,UA,D', + | 'source.produces-delete-by-key' = 'true', | 'readable-metadata' = 'metadata_1:INT, metadata_2:STRING' |)""".stripMargin) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala index eb533cbe1c0ef..ea905662a4221 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala @@ -492,6 +492,7 @@ class TableScanTest extends TableTestBase { | 'connector' = 'values', | 'changelog-mode' = 'UA,D', | 'enable-watermark-push-down' = 'true', + | 'source.produces-delete-by-key' = 'true', | 'disable-lookup' = 'true' |) """.stripMargin) @@ -508,7 +509,8 @@ class TableScanTest extends TableTestBase { | PRIMARY KEY (a) NOT ENFORCED |) WITH ( | 'connector' = 'values', - | 'changelog-mode' = 'UA,D' + | 'changelog-mode' = 'UA,D', + | 'source.produces-delete-by-key' = 'true' |) """.stripMargin) util.addTable(""" @@ -593,6 +595,7 @@ class TableScanTest extends TableTestBase { |) WITH ( | 'connector' = 'values', | 'changelog-mode' = 'UA,D', + | 'source.produces-delete-by-key' = 'true', | 'disable-lookup' = 'true' |) """.stripMargin) @@ -628,6 +631,7 @@ class TableScanTest extends TableTestBase { |) WITH ( | 'connector' = 'values', | 'changelog-mode' = 'UA,D', + | 'source.produces-delete-by-key' = 'true', | 'disable-lookup' = 'true' |) """.stripMargin) @@ -786,7 +790,8 @@ class TableScanTest extends TableTestBase { | 'runtime-source' = 'DataStream', | 'scan.parallelism' = '5', | 'enable-projection-push-down' = 'false', - | 'changelog-mode' = 'I,UA,D' + | 'changelog-mode' = 'I,UA,D', + | 'source.produces-delete-by-key' = 'true' |) """.stripMargin) util.addTable(""" diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 2f4c576bc7cb6..6dc54d54ee310 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -90,8 +90,10 @@ import org.apache.calcite.sql.{SqlExplainLevel, SqlIntervalQualifier} import org.apache.calcite.sql.parser.SqlParserPos import org.assertj.core.api.Assertions.{assertThat, assertThatExceptionOfType, fail} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.TestTemplate import org.junit.jupiter.api.extension.{BeforeEachCallback, ExtendWith, ExtensionContext, RegisterExtension} import org.junit.jupiter.api.io.TempDir +import org.junit.jupiter.params.ParameterizedTest import org.junit.platform.commons.support.AnnotationSupport import java.io.{File, IOException} @@ -155,7 +157,14 @@ class TestName extends BeforeEachCallback { } methodName = s"${context.getTestMethod.get().getName}$displayName" } else { - methodName = context.getTestMethod.get().getName + if ( + AnnotationSupport.isAnnotated(context.getTestMethod, classOf[ParameterizedTest]) + || AnnotationSupport.isAnnotated(context.getTestMethod, classOf[TestTemplate]) + ) { + methodName = s"${context.getTestMethod.get().getName}[${context.getDisplayName}]" + } else { + methodName = context.getTestMethod.get().getName + } } }