Skip to content

Commit 40b9052

Browse files
committed
[FLINK-37475] Drop ChangelogNormalize for piping from upsert source to sink
* Add information about DELETE to ChangelogMode * Adapt FlinkChangelogModeInferenceProgram to remove ChangelogNormalize if possible
1 parent b1c3209 commit 40b9052

File tree

19 files changed

+1969
-79
lines changed

19 files changed

+1969
-79
lines changed

flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.EnumSet;
2727
import java.util.Objects;
2828
import java.util.Set;
29+
import java.util.stream.Collectors;
2930

3031
/**
3132
* The set of changes contained in a changelog.
@@ -43,6 +44,15 @@ public final class ChangelogMode {
4344
.addContainedKind(RowKind.INSERT)
4445
.addContainedKind(RowKind.UPDATE_AFTER)
4546
.addContainedKind(RowKind.DELETE)
47+
.keyOnlyDeletes(true)
48+
.build();
49+
50+
private static final ChangelogMode UPSERT_WITH_FULL_DELETES =
51+
ChangelogMode.newBuilder()
52+
.addContainedKind(RowKind.INSERT)
53+
.addContainedKind(RowKind.UPDATE_AFTER)
54+
.addContainedKind(RowKind.DELETE)
55+
.keyOnlyDeletes(false)
4656
.build();
4757

4858
private static final ChangelogMode ALL =
@@ -54,11 +64,13 @@ public final class ChangelogMode {
5464
.build();
5565

5666
private final Set<RowKind> kinds;
67+
private final boolean keyOnlyDeletes;
5768

58-
private ChangelogMode(Set<RowKind> kinds) {
69+
private ChangelogMode(Set<RowKind> kinds, boolean keyOnlyDeletes) {
5970
Preconditions.checkArgument(
6071
kinds.size() > 0, "At least one kind of row should be contained in a changelog.");
6172
this.kinds = Collections.unmodifiableSet(kinds);
73+
this.keyOnlyDeletes = keyOnlyDeletes;
6274
}
6375

6476
/** Shortcut for a simple {@link RowKind#INSERT}-only changelog. */
@@ -71,7 +83,21 @@ public static ChangelogMode insertOnly() {
7183
* contain {@link RowKind#UPDATE_BEFORE} rows.
7284
*/
7385
public static ChangelogMode upsert() {
74-
return UPSERT;
86+
return upsert(true);
87+
}
88+
89+
/**
90+
* Shortcut for an upsert changelog that describes idempotent updates on a key and thus does not
91+
* contain {@link RowKind#UPDATE_BEFORE} rows.
92+
*
93+
* @param keyOnlyDeletes Tells the system the DELETEs contain just the key.
94+
*/
95+
public static ChangelogMode upsert(boolean keyOnlyDeletes) {
96+
if (keyOnlyDeletes) {
97+
return UPSERT;
98+
} else {
99+
return UPSERT_WITH_FULL_DELETES;
100+
}
75101
}
76102

77103
/** Shortcut for a changelog that can contain all {@link RowKind}s. */
@@ -96,6 +122,10 @@ public boolean containsOnly(RowKind kind) {
96122
return kinds.size() == 1 && kinds.contains(kind);
97123
}
98124

125+
public boolean keyOnlyDeletes() {
126+
return keyOnlyDeletes;
127+
}
128+
99129
@Override
100130
public boolean equals(Object o) {
101131
if (this == o) {
@@ -115,7 +145,13 @@ public int hashCode() {
115145

116146
@Override
117147
public String toString() {
118-
return kinds.toString();
148+
if (!keyOnlyDeletes) {
149+
return kinds.toString();
150+
} else {
151+
return kinds.stream()
152+
.map(kind -> kind == RowKind.DELETE ? "~D" : kind.toString())
153+
.collect(Collectors.joining(", ", "[", "]"));
154+
}
119155
}
120156

121157
// --------------------------------------------------------------------------------------------
@@ -125,6 +161,7 @@ public String toString() {
125161
public static class Builder {
126162

127163
private final Set<RowKind> kinds = EnumSet.noneOf(RowKind.class);
164+
private boolean keyOnlyDeletes = false;
128165

129166
private Builder() {
130167
// default constructor to allow a fluent definition
@@ -135,8 +172,13 @@ public Builder addContainedKind(RowKind kind) {
135172
return this;
136173
}
137174

175+
public Builder keyOnlyDeletes(boolean flag) {
176+
this.keyOnlyDeletes = flag;
177+
return this;
178+
}
179+
138180
public ChangelogMode build() {
139-
return new ChangelogMode(kinds);
181+
return new ChangelogMode(kinds, keyOnlyDeletes);
140182
}
141183
}
142184
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.optimize;
20+
21+
import org.apache.flink.table.catalog.Column;
22+
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
23+
import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
24+
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalcBase;
25+
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize;
26+
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
27+
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan;
28+
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner;
29+
import org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram;
30+
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
31+
import org.apache.flink.table.planner.plan.trait.UpdateKindTrait$;
32+
import org.apache.flink.table.planner.plan.trait.UpdateKindTraitDef$;
33+
import org.apache.flink.table.planner.plan.utils.FlinkRelUtil;
34+
35+
import org.apache.calcite.rel.RelNode;
36+
import org.apache.calcite.rel.SingleRel;
37+
import org.apache.calcite.rex.RexNode;
38+
import org.apache.calcite.util.ImmutableBitSet;
39+
40+
import java.util.Collection;
41+
import java.util.List;
42+
import java.util.Map;
43+
import java.util.Objects;
44+
import java.util.Optional;
45+
import java.util.Set;
46+
import java.util.stream.Collectors;
47+
48+
/**
49+
* Checks if it is safe to remove ChangelogNormalize as part of {@link
50+
* FlinkChangelogModeInferenceProgram}. It checks:
51+
*
52+
* <ul>
53+
* <li>if there is no filter pushed into the changelog normalize
54+
* <li>if we don't need to produce UPDATE_BEFORE
55+
* <li>we don't access any metadata columns
56+
* </ul>
57+
*/
58+
public class ChangelogNormalizeRequirementResolver {
59+
60+
/** Checks if it is safe to remove ChangelogNormalize. */
61+
public static boolean isRequired(StreamPhysicalChangelogNormalize normalize) {
62+
if (normalize.filterCondition() != null) {
63+
return true;
64+
}
65+
if (!Objects.equals(
66+
normalize.getTraitSet().getTrait(UpdateKindTraitDef$.MODULE$.INSTANCE()),
67+
UpdateKindTrait$.MODULE$.ONLY_UPDATE_AFTER())) {
68+
return true;
69+
}
70+
71+
// check if metadata columns are accessed
72+
final RelNode input = normalize.getInput();
73+
74+
return visit(input, bitSetForAllOutputColumns(input));
75+
}
76+
77+
private static ImmutableBitSet bitSetForAllOutputColumns(RelNode input) {
78+
return ImmutableBitSet.builder().set(0, input.getRowType().getFieldCount()).build();
79+
}
80+
81+
private static boolean visit(final RelNode rel, final ImmutableBitSet requiredColumns) {
82+
if (rel instanceof StreamPhysicalCalcBase) {
83+
return visitCalc((StreamPhysicalCalcBase) rel, requiredColumns);
84+
} else if (rel instanceof StreamPhysicalTableSourceScan) {
85+
return visitTableSourceScan((StreamPhysicalTableSourceScan) rel, requiredColumns);
86+
} else if (rel instanceof StreamPhysicalWatermarkAssigner
87+
|| rel instanceof StreamPhysicalExchange) {
88+
// require all input columns
89+
final RelNode input = ((SingleRel) rel).getInput();
90+
return visit(input, bitSetForAllOutputColumns(input));
91+
} else {
92+
// these nodes should not be in an input of a changelog normalize
93+
// StreamPhysicalChangelogNormalize
94+
// StreamPhysicalDropUpdateBefore
95+
// StreamPhysicalUnion
96+
// StreamPhysicalSort
97+
// StreamPhysicalLimit
98+
// StreamPhysicalSortLimit
99+
// StreamPhysicalTemporalSort
100+
// StreamPhysicalWindowTableFunction
101+
// StreamPhysicalWindowRank
102+
// StreamPhysicalWindowDeduplicate
103+
// StreamPhysicalRank
104+
// StreamPhysicalOverAggregateBase
105+
// CommonPhysicalJoin
106+
// StreamPhysicalMatch
107+
// StreamPhysicalMiniBatchAssigner
108+
// StreamPhysicalExpand
109+
// StreamPhysicalWindowAggregateBase
110+
// StreamPhysicalGroupAggregateBase
111+
// StreamPhysicalSink
112+
// StreamPhysicalLegacySink
113+
// StreamPhysicalCorrelateBase
114+
// StreamPhysicalLookupJoin
115+
// StreamPhysicalValues
116+
// StreamPhysicalDataStreamScan
117+
// StreamPhysicalLegacyTableSourceScan
118+
throw new UnsupportedOperationException(
119+
String.format(
120+
"Unsupported to visit node %s. The node either should not be pushed"
121+
+ " through the changelog normalize or is not supported yet.",
122+
rel.getClass().getSimpleName()));
123+
}
124+
}
125+
126+
private static boolean visitTableSourceScan(
127+
StreamPhysicalTableSourceScan tableScan, ImmutableBitSet requiredColumns) {
128+
if (!(tableScan.tableSource() instanceof SupportsReadingMetadata)) {
129+
// source does not have metadata, no need to check
130+
return false;
131+
}
132+
final TableSourceTable sourceTable = tableScan.getTable().unwrap(TableSourceTable.class);
133+
assert sourceTable != null;
134+
// check if requiredColumns contain metadata column
135+
final List<Column.MetadataColumn> metadataColumns =
136+
DynamicSourceUtils.extractMetadataColumns(
137+
sourceTable.contextResolvedTable().getResolvedSchema());
138+
final Set<String> metaColumnSet =
139+
metadataColumns.stream().map(Column::getName).collect(Collectors.toSet());
140+
final List<String> columns = tableScan.getRowType().getFieldNames();
141+
for (int index = 0; index < columns.size(); index++) {
142+
String column = columns.get(index);
143+
if (metaColumnSet.contains(column) && requiredColumns.get(index)) {
144+
// we require metadata column, therefore, we cannot remove the changelog normalize
145+
return true;
146+
}
147+
}
148+
149+
return false;
150+
}
151+
152+
private static boolean visitCalc(StreamPhysicalCalcBase calc, ImmutableBitSet requiredColumns) {
153+
// evaluate required columns from input
154+
final List<RexNode> projects =
155+
calc.getProgram().getProjectList().stream()
156+
.map(expr -> calc.getProgram().expandLocalRef(expr))
157+
.collect(Collectors.toList());
158+
final Map<Integer, List<Integer>> outFromSourcePos =
159+
FlinkRelUtil.extractSourceMapping(projects);
160+
final List<Integer> conv2Inputs =
161+
requiredColumns.toList().stream()
162+
.map(
163+
out ->
164+
Optional.ofNullable(outFromSourcePos.get(out))
165+
.orElseThrow(
166+
() ->
167+
new IllegalStateException(
168+
String.format(
169+
"Invalid pos:%d over projection:%s",
170+
out,
171+
calc
172+
.getProgram()))))
173+
.flatMap(Collection::stream)
174+
.filter(index -> index != -1)
175+
.distinct()
176+
.collect(Collectors.toList());
177+
return visit(calc.getInput(), ImmutableBitSet.of(conv2Inputs));
178+
}
179+
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
6363
import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils;
6464
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
65+
import org.apache.flink.table.planner.plan.utils.FlinkRelUtil;
6566
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
6667
import org.apache.flink.table.planner.plan.utils.JoinUtil;
6768
import org.apache.flink.table.planner.plan.utils.OverAggregateUtil;
@@ -78,7 +79,6 @@
7879
import org.apache.calcite.rel.type.RelDataType;
7980
import org.apache.calcite.rex.RexNode;
8081
import org.apache.calcite.rex.RexProgram;
81-
import org.apache.calcite.rex.RexSlot;
8282
import org.apache.calcite.sql.SqlExplainLevel;
8383
import org.apache.calcite.util.ImmutableBitSet;
8484

@@ -297,7 +297,8 @@ private StreamPhysicalRel visitCalc(
297297
calc.getProgram().getProjectList().stream()
298298
.map(expr -> calc.getProgram().expandLocalRef(expr))
299299
.collect(Collectors.toList());
300-
Map<Integer, List<Integer>> outFromSourcePos = extractSourceMapping(projects);
300+
Map<Integer, List<Integer>> outFromSourcePos =
301+
FlinkRelUtil.extractSourceMapping(projects);
301302
List<Integer> conv2Inputs =
302303
requireDeterminism.toList().stream()
303304
.map(
@@ -812,22 +813,6 @@ private StreamPhysicalRel visitJoinChild(
812813
return transmitDeterminismRequirement(rel, inputRequireDeterminism);
813814
}
814815

815-
/** Extracts the out from source field index mapping of the given projects. */
816-
private Map<Integer, List<Integer>> extractSourceMapping(final List<RexNode> projects) {
817-
Map<Integer, List<Integer>> mapOutFromInPos = new HashMap<>();
818-
819-
for (int index = 0; index < projects.size(); index++) {
820-
RexNode expr = projects.get(index);
821-
mapOutFromInPos.put(
822-
index,
823-
FlinkRexUtil.findAllInputRefs(expr).stream()
824-
.mapToInt(RexSlot::getIndex)
825-
.boxed()
826-
.collect(Collectors.toList()));
827-
}
828-
return mapOutFromInPos;
829-
}
830-
831816
private void checkNonDeterministicRexProgram(
832817
final ImmutableBitSet requireDeterminism,
833818
final RexProgram program,
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.trait;
20+
21+
/** Lists all kinds of {@link ModifyKind#DELETE} operation. */
22+
public enum DeleteKind {
23+
24+
/** This kind indicates that operators do not emit {@link ModifyKind#DELETE} operation. */
25+
NONE,
26+
27+
/**
28+
* This kind indicates that operators can emit deletes with the key only. The rest of the row
29+
* may be not present.
30+
*/
31+
DELETE_BY_KEY,
32+
33+
/** This kind indicates that operators should emit deletes with the full row. */
34+
FULL_DELETE
35+
}

0 commit comments

Comments
 (0)