Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 3f18260

Browse files
committedMar 17, 2025
[FLINK-37475] Drop ChangelogNormalize for piping from upsert source to sink
* Add information about DELETE to ChangelogMode * Adapt FlinkChangelogModeInferenceProgram to remove ChangelogNormalize if possible
1 parent 452b648 commit 3f18260

File tree

11 files changed

+1592
-46
lines changed

11 files changed

+1592
-46
lines changed
 

‎flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public final class ChangelogMode {
4343
.addContainedKind(RowKind.INSERT)
4444
.addContainedKind(RowKind.UPDATE_AFTER)
4545
.addContainedKind(RowKind.DELETE)
46+
.deletesOnKey(true)
4647
.build();
4748

4849
private static final ChangelogMode ALL =
@@ -54,11 +55,13 @@ public final class ChangelogMode {
5455
.build();
5556

5657
private final Set<RowKind> kinds;
58+
private final boolean deletesOnKey;
5759

58-
private ChangelogMode(Set<RowKind> kinds) {
60+
private ChangelogMode(Set<RowKind> kinds, boolean deletesOnKey) {
5961
Preconditions.checkArgument(
6062
kinds.size() > 0, "At least one kind of row should be contained in a changelog.");
6163
this.kinds = Collections.unmodifiableSet(kinds);
64+
this.deletesOnKey = deletesOnKey;
6265
}
6366

6467
/** Shortcut for a simple {@link RowKind#INSERT}-only changelog. */
@@ -96,6 +99,10 @@ public boolean containsOnly(RowKind kind) {
9699
return kinds.size() == 1 && kinds.contains(kind);
97100
}
98101

102+
public boolean deletesOnKey() {
103+
return deletesOnKey;
104+
}
105+
99106
@Override
100107
public boolean equals(Object o) {
101108
if (this == o) {
@@ -125,6 +132,7 @@ public String toString() {
125132
public static class Builder {
126133

127134
private final Set<RowKind> kinds = EnumSet.noneOf(RowKind.class);
135+
private boolean deletesOnKey = false;
128136

129137
private Builder() {
130138
// default constructor to allow a fluent definition
@@ -135,8 +143,13 @@ public Builder addContainedKind(RowKind kind) {
135143
return this;
136144
}
137145

146+
public Builder deletesOnKey(boolean flag) {
147+
this.deletesOnKey = flag;
148+
return this;
149+
}
150+
138151
public ChangelogMode build() {
139-
return new ChangelogMode(kinds);
152+
return new ChangelogMode(kinds, deletesOnKey);
140153
}
141154
}
142155
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.trait;
20+
21+
/** Lists all kinds of {@link ModifyKind#DELETE} operation. */
22+
public enum DeleteKind {
23+
24+
/** This kind indicates that operators do not emit {@link ModifyKind#DELETE} operation. */
25+
NONE,
26+
27+
/**
28+
* This kind indicates that operators can emit deletes with the key only. The rest of the row
29+
* may be not present.
30+
*/
31+
DELETE_ON_KEY,
32+
33+
/** This kind indicates that operators should emit deletes with the full row. */
34+
FULL_DELETE
35+
}

‎flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala

+371-38
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.flink.table.planner.plan.`trait`
19+
20+
import org.apache.flink.table.connector.ChangelogMode
21+
import org.apache.flink.types.RowKind
22+
23+
import org.apache.calcite.plan.{RelOptPlanner, RelTrait, RelTraitDef}
24+
25+
/** DeleteKindTrait is used to describe the kind of delete operation. */
26+
class DeleteKindTrait(val deleteKind: DeleteKind) extends RelTrait {
27+
28+
override def satisfies(relTrait: RelTrait): Boolean = relTrait match {
29+
case other: DeleteKindTrait =>
30+
// should totally match
31+
other.deleteKind == this.deleteKind
32+
case _ => false
33+
}
34+
35+
override def getTraitDef: RelTraitDef[_ <: RelTrait] = DeleteKindTraitDef.INSTANCE
36+
37+
override def register(planner: RelOptPlanner): Unit = {}
38+
39+
override def hashCode(): Int = deleteKind.hashCode()
40+
41+
override def equals(obj: Any): Boolean = obj match {
42+
case t: DeleteKindTrait => this.deleteKind.equals(t.deleteKind)
43+
case _ => false
44+
}
45+
46+
override def toString: String = s"[${deleteKind.toString}]"
47+
}
48+
49+
object DeleteKindTrait {
50+
51+
/** An [[DeleteKindTrait]] that describes the node does not support delete operation. */
52+
val NONE = new DeleteKindTrait(DeleteKind.NONE)
53+
54+
/** An [[DeleteKindTrait]] that describes the node supports deletes on key only. */
55+
val DELETE_ON_KEY = new DeleteKindTrait(DeleteKind.DELETE_ON_KEY)
56+
57+
/** An [[DeleteKindTrait]] that describes the node produces requires deletes by full records. */
58+
val FULL_DELETE = new DeleteKindTrait(DeleteKind.FULL_DELETE)
59+
60+
/**
61+
* Returns DELETE_ON_KEY [[DeleteKindTrait]] if there is delete changes. Otherwise, returns NONE
62+
* [[DeleteKindTrait]].
63+
*/
64+
def deleteOnKeyOrNone(modifyKindSet: ModifyKindSet): DeleteKindTrait = {
65+
val deleteKind = if (modifyKindSet.contains(ModifyKind.DELETE)) {
66+
DeleteKind.DELETE_ON_KEY
67+
} else {
68+
DeleteKind.NONE
69+
}
70+
new DeleteKindTrait(deleteKind)
71+
}
72+
73+
/**
74+
* Returns FULL_DELETE [[DeleteKindTrait]] if there is delete changes. Otherwise, returns NONE
75+
* [[DeleteKindTrait]].
76+
*/
77+
def fullDeleteOrNone(modifyKindSet: ModifyKindSet): DeleteKindTrait = {
78+
val deleteKind = if (modifyKindSet.contains(ModifyKind.DELETE)) {
79+
DeleteKind.FULL_DELETE
80+
} else {
81+
DeleteKind.NONE
82+
}
83+
new DeleteKindTrait(deleteKind)
84+
}
85+
86+
/** Creates an instance of [[DeleteKindTrait]] from the given [[ChangelogMode]]. */
87+
def fromChangelogMode(changelogMode: ChangelogMode): DeleteKindTrait = {
88+
val hasDelete = changelogMode.contains(RowKind.DELETE)
89+
if (!hasDelete) {
90+
DeleteKindTrait.NONE
91+
} else {
92+
val hasDeleteOnKey = changelogMode.deletesOnKey()
93+
if (hasDeleteOnKey) {
94+
DeleteKindTrait.DELETE_ON_KEY
95+
} else {
96+
DeleteKindTrait.FULL_DELETE
97+
}
98+
}
99+
}
100+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.flink.table.planner.plan.`trait`
19+
20+
import org.apache.calcite.plan.{RelOptPlanner, RelTraitDef}
21+
import org.apache.calcite.rel.RelNode
22+
23+
class DeleteKindTraitDef extends RelTraitDef[DeleteKindTrait] {
24+
25+
override def getTraitClass: Class[DeleteKindTrait] = classOf[DeleteKindTrait]
26+
27+
override def getSimpleName: String = this.getClass.getSimpleName
28+
29+
override def convert(
30+
planner: RelOptPlanner,
31+
rel: RelNode,
32+
toTrait: DeleteKindTrait,
33+
allowInfiniteCostConverters: Boolean): RelNode = {
34+
rel.copy(rel.getTraitSet.plus(toTrait), rel.getInputs)
35+
}
36+
37+
override def canConvert(
38+
planner: RelOptPlanner,
39+
fromTrait: DeleteKindTrait,
40+
toTrait: DeleteKindTrait): Boolean = {
41+
throw new UnsupportedOperationException("DeleteKindTrait conversion is not supported for now.")
42+
}
43+
44+
override def getDefault: DeleteKindTrait = DeleteKindTrait.FULL_DELETE
45+
}
46+
47+
object DeleteKindTraitDef {
48+
val INSTANCE = new DeleteKindTraitDef()
49+
}

‎flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java

+45-5
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,22 @@ private static RowKind parseRowKind(String rowKindShortString) {
457457
.withDescription(
458458
"Option to determine whether or not to require the distribution bucket count");
459459

460+
private static final ConfigOption<Boolean> SINK_SUPPORTS_DELETE_ON_KEY =
461+
ConfigOptions.key("sink.supports-delete-on-key")
462+
.booleanType()
463+
.defaultValue(false)
464+
.withDescription(
465+
"Option to determine whether or not to require deletes to have the"
466+
+ " entire row or is a delete by key sufficient.");
467+
468+
private static final ConfigOption<Boolean> SOURCE_PRODUCES_DELETE_ON_KEY =
469+
ConfigOptions.key("source.produces-delete-on-key")
470+
.booleanType()
471+
.defaultValue(false)
472+
.withDescription(
473+
"Option to determine whether or not to require deletes to have the"
474+
+ " entire row or is a delete by key sufficient.");
475+
460476
private static final ConfigOption<Integer> SOURCE_NUM_ELEMENT_TO_SKIP =
461477
ConfigOptions.key("source.num-element-to-skip")
462478
.intType()
@@ -504,7 +520,10 @@ public DynamicTableSource createDynamicTableSource(Context context) {
504520

505521
helper.validate();
506522

507-
ChangelogMode changelogMode = parseChangelogMode(helper.getOptions().get(CHANGELOG_MODE));
523+
ChangelogMode changelogMode =
524+
parseChangelogMode(
525+
helper.getOptions().get(CHANGELOG_MODE),
526+
helper.getOptions().get(SOURCE_PRODUCES_DELETE_ON_KEY));
508527
String runtimeSource = helper.getOptions().get(RUNTIME_SOURCE);
509528
boolean isBounded = helper.getOptions().get(BOUNDED);
510529
boolean isFinite = helper.getOptions().get(TERMINATING);
@@ -749,6 +768,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
749768
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
750769

751770
boolean requireBucketCount = helper.getOptions().get(SINK_BUCKET_COUNT_REQUIRED);
771+
boolean supportsDeleteByKey = helper.getOptions().get(SINK_SUPPORTS_DELETE_ON_KEY);
752772
if (sinkClass.equals("DEFAULT")) {
753773
int rowTimeIndex =
754774
validateAndExtractRowtimeIndex(
@@ -765,7 +785,8 @@ public DynamicTableSink createDynamicTableSink(Context context) {
765785
changelogMode,
766786
rowTimeIndex,
767787
tableSchema,
768-
requireBucketCount);
788+
requireBucketCount,
789+
supportsDeleteByKey);
769790
} else {
770791
try {
771792
return InstantiationUtil.instantiate(
@@ -816,6 +837,8 @@ public Set<ConfigOption<?>> optionalOptions() {
816837
ENABLE_WATERMARK_PUSH_DOWN,
817838
SINK_DROP_LATE_EVENT,
818839
SINK_BUCKET_COUNT_REQUIRED,
840+
SINK_SUPPORTS_DELETE_ON_KEY,
841+
SOURCE_PRODUCES_DELETE_ON_KEY,
819842
SOURCE_NUM_ELEMENT_TO_SKIP,
820843
SOURCE_SLEEP_AFTER_ELEMENTS,
821844
SOURCE_SLEEP_TIME,
@@ -916,6 +939,10 @@ private static Map<Map<String, String>, Collection<Row>> mapPartitionToRow(
916939
}
917940

918941
private ChangelogMode parseChangelogMode(String string) {
942+
return parseChangelogMode(string, false);
943+
}
944+
945+
private ChangelogMode parseChangelogMode(String string, boolean producesDeleteByKey) {
919946
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
920947
for (String split : string.split(",")) {
921948
switch (split.trim()) {
@@ -935,6 +962,7 @@ private ChangelogMode parseChangelogMode(String string) {
935962
throw new IllegalArgumentException("Invalid ChangelogMode string: " + string);
936963
}
937964
}
965+
builder.deletesOnKey(producesDeleteByKey);
938966
return builder.build();
939967
}
940968

@@ -1621,7 +1649,7 @@ private static class TestValuesScanTableSourceWithWatermarkPushDown
16211649
implements SupportsWatermarkPushDown, SupportsSourceWatermark {
16221650
private final String tableName;
16231651

1624-
private WatermarkStrategy<RowData> watermarkStrategy;
1652+
private WatermarkStrategy<RowData> watermarkStrategy = WatermarkStrategy.noWatermarks();
16251653

16261654
private TestValuesScanTableSourceWithWatermarkPushDown(
16271655
DataType producedDataType,
@@ -2207,6 +2235,7 @@ private static class TestValuesTableSink
22072235
private final int rowtimeIndex;
22082236
private final TableSchema tableSchema;
22092237
private final boolean requireBucketCount;
2238+
private final boolean supportsDeleteByKey;
22102239

22112240
private TestValuesTableSink(
22122241
DataType consumedDataType,
@@ -2220,7 +2249,8 @@ private TestValuesTableSink(
22202249
@Nullable ChangelogMode changelogModeEnforced,
22212250
int rowtimeIndex,
22222251
TableSchema tableSchema,
2223-
boolean requireBucketCount) {
2252+
boolean requireBucketCount,
2253+
boolean supportsDeleteByKey) {
22242254
this.consumedDataType = consumedDataType;
22252255
this.primaryKeyIndices = primaryKeyIndices;
22262256
this.tableName = tableName;
@@ -2233,10 +2263,19 @@ private TestValuesTableSink(
22332263
this.rowtimeIndex = rowtimeIndex;
22342264
this.tableSchema = tableSchema;
22352265
this.requireBucketCount = requireBucketCount;
2266+
this.supportsDeleteByKey = supportsDeleteByKey;
22362267
}
22372268

22382269
@Override
22392270
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
2271+
final ChangelogMode mode = getMode(requestedMode);
2272+
final ChangelogMode.Builder builder = ChangelogMode.newBuilder();
2273+
mode.getContainedKinds().forEach(builder::addContainedKind);
2274+
builder.deletesOnKey(supportsDeleteByKey);
2275+
return builder.build();
2276+
}
2277+
2278+
private ChangelogMode getMode(ChangelogMode requestedMode) {
22402279
// if param [changelogModeEnforced] is passed in, return it directly
22412280
if (changelogModeEnforced != null) {
22422281
return changelogModeEnforced;
@@ -2376,7 +2415,8 @@ public DynamicTableSink copy() {
23762415
changelogModeEnforced,
23772416
rowtimeIndex,
23782417
tableSchema,
2379-
requireBucketCount);
2418+
requireBucketCount,
2419+
supportsDeleteByKey);
23802420
}
23812421

23822422
@Override

0 commit comments

Comments
 (0)
Please sign in to comment.