Skip to content

Commit f309d31

Browse files
[FLINK-37429][transform] Map each column name to a new name in generated expression
This closes #3939 Co-authored-by: Leonard Xu <[email protected]>
1 parent e59a170 commit f309d31

File tree

18 files changed

+880
-112
lines changed

18 files changed

+880
-112
lines changed

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java

+63
Original file line numberDiff line numberDiff line change
@@ -956,6 +956,69 @@ void testTransformWithTemporalFunction() throws Exception {
956956
Arrays.stream(outputEvents).forEach(this::extractDataLines);
957957
}
958958

959+
@ParameterizedTest
960+
@EnumSource
961+
public void testTransformWithColumnNameMap(ValuesDataSink.SinkApi sinkApi) throws Exception {
962+
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
963+
964+
// Setup value source
965+
Configuration sourceConfig = new Configuration();
966+
sourceConfig.set(
967+
ValuesDataSourceOptions.EVENT_SET_ID,
968+
ValuesDataSourceHelper.EventSetId.COMPLEX_COLUMN_NAME_TABLE);
969+
SourceDef sourceDef =
970+
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
971+
972+
// Setup value sink
973+
Configuration sinkConfig = new Configuration();
974+
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
975+
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
976+
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
977+
978+
// Setup transform
979+
TransformDef transformDef =
980+
new TransformDef(
981+
"default_namespace.default_schema.table1",
982+
"*, `timestamp-type`",
983+
"`foo-bar` > 0",
984+
null,
985+
null,
986+
null,
987+
null,
988+
null);
989+
990+
// Setup pipeline
991+
Configuration pipelineConfig = new Configuration();
992+
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
993+
pipelineConfig.set(
994+
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
995+
PipelineDef pipelineDef =
996+
new PipelineDef(
997+
sourceDef,
998+
sinkDef,
999+
Collections.emptyList(),
1000+
new ArrayList<>(Arrays.asList(transformDef)),
1001+
Collections.emptyList(),
1002+
pipelineConfig);
1003+
1004+
// Execute the pipeline
1005+
PipelineExecution execution = composer.compose(pipelineDef);
1006+
execution.execute();
1007+
1008+
// Check the order and content of all received events
1009+
String[] outputEvents = outCaptor.toString().trim().split("\n");
1010+
assertThat(outputEvents)
1011+
.containsExactly(
1012+
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`class` STRING NOT NULL,`foo-bar` INT,`bar-foo` INT,`timestamp-type` STRING NOT NULL}, primaryKeys=class, options=()}",
1013+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[class1, 1, 10, type1], op=INSERT, meta=({timestamp-type=type1})}",
1014+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[class2, 2, 100, type2], op=INSERT, meta=({timestamp-type=type2})}",
1015+
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`import-package` STRING, position=AFTER, existedColumnName=bar-foo}]}",
1016+
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={bar-foo=bar-baz}}",
1017+
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[bar-baz]}",
1018+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[class1, 1, , type1], after=[], op=DELETE, meta=({timestamp-type=type1})}",
1019+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[class2, 2, , type2], after=[new-class2, 20, new-package2, type2], op=UPDATE, meta=({timestamp-type=type2})}");
1020+
}
1021+
9591022
void runGenericTransformTest(
9601023
ValuesDataSink.SinkApi sinkApi,
9611024
List<TransformDef> transformDefs,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.values.source;
19+
20+
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
21+
import org.apache.flink.cdc.common.types.DataType;
22+
import org.apache.flink.cdc.common.types.DataTypes;
23+
24+
import java.util.Map;
25+
26+
/** A {@link SupportedMetadataColumn} for timestamp-type. */
27+
public class TimestampTypeMetadataColumn implements SupportedMetadataColumn {
28+
29+
@Override
30+
public String getName() {
31+
return "timestamp-type";
32+
}
33+
34+
@Override
35+
public DataType getType() {
36+
return DataTypes.STRING();
37+
}
38+
39+
@Override
40+
public Class<?> getJavaClass() {
41+
return String.class;
42+
}
43+
44+
@Override
45+
public Object read(Map<String, String> metadata) {
46+
return metadata.getOrDefault(getName(), null);
47+
}
48+
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSource.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ public MetadataAccessor getMetadataAccessor() {
8383

8484
@Override
8585
public SupportedMetadataColumn[] supportedMetadataColumns() {
86-
return new SupportedMetadataColumn[] {new OpTsMetadataColumn()};
86+
return new SupportedMetadataColumn[] {
87+
new OpTsMetadataColumn(), new TimestampTypeMetadataColumn()
88+
};
8789
}
8890

8991
/**

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java

+130-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.flink.cdc.common.schema.Schema;
3131
import org.apache.flink.cdc.common.types.DataTypes;
3232
import org.apache.flink.cdc.common.types.RowType;
33+
import org.apache.flink.cdc.common.utils.SchemaUtils;
3334
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
3435

3536
import java.util.ArrayList;
@@ -54,7 +55,8 @@ public enum EventSetId {
5455
SINGLE_SPLIT_MULTI_TABLES,
5556
MULTI_SPLITS_SINGLE_TABLE,
5657
CUSTOM_SOURCE_EVENTS,
57-
TRANSFORM_TABLE
58+
TRANSFORM_TABLE,
59+
COMPLEX_COLUMN_NAME_TABLE
5860
}
5961

6062
public static final TableId TABLE_1 =
@@ -120,6 +122,11 @@ public static void setSourceEvents(EventSetId eventType) {
120122
sourceEvents = transformTable();
121123
break;
122124
}
125+
case COMPLEX_COLUMN_NAME_TABLE:
126+
{
127+
sourceEvents = complexColumnNameTable();
128+
break;
129+
}
123130
default:
124131
throw new IllegalArgumentException(eventType + " is not supported");
125132
}
@@ -644,4 +651,126 @@ public static List<List<Event>> transformTable() {
644651
eventOfSplits.add(split1);
645652
return eventOfSplits;
646653
}
654+
655+
public static List<List<Event>> complexColumnNameTable() {
656+
List<List<Event>> eventOfSplits = new ArrayList<>();
657+
List<Event> split1 = new ArrayList<>();
658+
659+
// create table
660+
Schema schema =
661+
Schema.newBuilder()
662+
.physicalColumn("class", DataTypes.STRING())
663+
.physicalColumn("foo-bar", DataTypes.INT())
664+
.physicalColumn("bar-foo", DataTypes.INT())
665+
.primaryKey("class")
666+
.build();
667+
CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, schema);
668+
split1.add(createTableEvent);
669+
670+
BinaryRecordDataGenerator generator =
671+
new BinaryRecordDataGenerator((RowType) schema.toRowDataType());
672+
// insert
673+
DataChangeEvent insertEvent1 =
674+
DataChangeEvent.insertEvent(
675+
TABLE_1,
676+
generator.generate(
677+
new Object[] {
678+
BinaryStringData.fromString("class0"), 0, 0,
679+
}),
680+
new HashMap<String, String>() {
681+
{
682+
put("timestamp-type", "type0");
683+
}
684+
});
685+
split1.add(insertEvent1);
686+
DataChangeEvent insertEvent2 =
687+
DataChangeEvent.insertEvent(
688+
TABLE_1,
689+
generator.generate(
690+
new Object[] {
691+
BinaryStringData.fromString("class1"), 1, 10,
692+
}),
693+
new HashMap<String, String>() {
694+
{
695+
put("timestamp-type", "type1");
696+
}
697+
});
698+
split1.add(insertEvent2);
699+
DataChangeEvent insertEvent3 =
700+
DataChangeEvent.insertEvent(
701+
TABLE_1,
702+
generator.generate(
703+
new Object[] {BinaryStringData.fromString("class2"), 2, 100}),
704+
new HashMap<String, String>() {
705+
{
706+
put("timestamp-type", "type2");
707+
}
708+
});
709+
split1.add(insertEvent3);
710+
711+
// add column
712+
AddColumnEvent.ColumnWithPosition columnWithPosition =
713+
new AddColumnEvent.ColumnWithPosition(
714+
Column.physicalColumn("import-package", DataTypes.STRING()));
715+
AddColumnEvent addColumnEvent =
716+
new AddColumnEvent(TABLE_1, Collections.singletonList(columnWithPosition));
717+
split1.add(addColumnEvent);
718+
schema = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
719+
720+
// rename column
721+
Map<String, String> nameMapping = new HashMap<>();
722+
nameMapping.put("bar-foo", "bar-baz");
723+
RenameColumnEvent renameColumnEvent = new RenameColumnEvent(TABLE_1, nameMapping);
724+
split1.add(renameColumnEvent);
725+
schema = SchemaUtils.applySchemaChangeEvent(schema, renameColumnEvent);
726+
727+
// drop column
728+
DropColumnEvent dropColumnEvent =
729+
new DropColumnEvent(TABLE_1, Collections.singletonList("bar-baz"));
730+
split1.add(dropColumnEvent);
731+
schema = SchemaUtils.applySchemaChangeEvent(schema, dropColumnEvent);
732+
733+
generator = new BinaryRecordDataGenerator((RowType) schema.toRowDataType());
734+
735+
// delete
736+
split1.add(
737+
DataChangeEvent.deleteEvent(
738+
TABLE_1,
739+
generator.generate(
740+
new Object[] {
741+
BinaryStringData.fromString("class1"),
742+
1,
743+
BinaryStringData.fromString(""),
744+
}),
745+
new HashMap<String, String>() {
746+
{
747+
put("timestamp-type", "type1");
748+
}
749+
}));
750+
751+
// update
752+
split1.add(
753+
DataChangeEvent.updateEvent(
754+
TABLE_1,
755+
generator.generate(
756+
new Object[] {
757+
BinaryStringData.fromString("class2"),
758+
2,
759+
BinaryStringData.fromString("")
760+
}),
761+
generator.generate(
762+
new Object[] {
763+
BinaryStringData.fromString("new-class2"),
764+
20,
765+
BinaryStringData.fromString("new-package2"),
766+
}),
767+
new HashMap<String, String>() {
768+
{
769+
put("timestamp-type", "type2");
770+
}
771+
}));
772+
773+
eventOfSplits.add(split1);
774+
return eventOfSplits;
775+
}
647776
}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ private Optional<DataChangeEvent> processDataChangeEvent(DataChangeEvent dataCha
409409
Optional<TransformFilter> transformFilterOptional = transform.getFilter();
410410

411411
if (transformFilterOptional.isPresent()
412-
&& transformFilterOptional.get().isVaild()) {
412+
&& transformFilterOptional.get().isValid()) {
413413
TransformFilter transformFilter = transformFilterOptional.get();
414414
if (!transformFilterProcessorMap.containsKey(
415415
Tuple2.of(tableId, transformFilter))) {

0 commit comments

Comments
 (0)