Skip to content

不支持标准的Debezium JSON吗 #617

@benxiaohai061

Description

@benxiaohai061

我现在想实现一个实时同步mysql数据到doris的方案(整库同步)。但由于网络隔离原因。 mysql和doris不在同一个网络。我使用了如下方案。
1、通过flink-cdc 实时把mysql的数据生成Debezium JSON格式文件,
2、然后再通过FTP传入内网。
3、再使用flink程序实时读取内网接收的文件入doris。

但现在出现的问题是。
1、doris的表必须存在。 不能自动建表。
2、标准Debezium JSON的格式,doris sink执行会报错。
15:51:34,285 WARN org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChange [] - Failed to extract tableChanges. record={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"student_id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"Male,Female,Other"},"field":"gender"},{"type":"string","optional":true,"field":"date_of_birth"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"phone_number"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"enrollment_date"},{"type":"int16","optional":true,"default":1,"field":"is_active"}],"optional":true,"name":"mysql_binlog_source.test.students.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"student_id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"Male,Female,Other"},"field":"gender"},{"type":"string","optional":true,"field":"date_of_birth"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"phone_number"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"enrollment_date"},{"type":"int16","optional":true,"default":1,"field":"is_active"}],"optional":true,"name":"mysql_binlog_source.test.students.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_binlog_source.test.students.Envelope"},"payload":{"before":null,"after":{"student_id":5,"first_name":"Charlie","last_name":"Davis","gender":"Other","date_of_birth":"2002-07-30","email":"[email protected]","phone_number":null,"address":null,"enrollment_date":"2023-09-01","is_active":0},"source":{"version":"1.9.8.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"students","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1760686786704,"transaction":null}}
15:51:34,286 WARN org.apache.doris.flink.sink.writer.serializer.jsondebezium.JsonDebeziumSchemaChangeImplV2 [] - Failed to parse eventType. recordRoot={"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"student_id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"Male,Female,Other"},"field":"gender"},{"type":"string","optional":true,"field":"date_of_birth"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"phone_number"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"enrollment_date"},{"type":"int16","optional":true,"default":1,"field":"is_active"}],"optional":true,"name":"mysql_binlog_source.test.students.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"student_id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"Male,Female,Other"},"field":"gender"},{"type":"string","optional":true,"field":"date_of_birth"},{"type":"string","optional":true,"field":"email"},{"type":"string","optional":true,"field":"phone_number"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"enrollment_date"},{"type":"int16","optional":true,"default":1,"field":"is_active"}],"optional":true,"name":"mysql_binlog_source.test.students.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_binlog_source.test.students.Envelope"},"payload":{"before":null,"after":{"student_id":5,"first_name":"Charlie","last_name":"Davis","gender":"Other","date_of_birth":"2002-07-30","email":"[email protected]","phone_number":null,"address":null,"enrollment_date":"2023-09-01","is_active":0},"source":{"version":"1.9.8.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"students","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1760686786704,"transaction":null}}
15:51:34,286 W

相关代码是:
1、实时生成文件
Map<String, Object> customConverterConfigs = new HashMap<>();
customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
JsonDebeziumDeserializationSchema schema =
new JsonDebeziumDeserializationSchema(true, customConverterConfigs);

    MySqlSource<String> mySqlSource =
            MySqlSource.<String>builder()
                    .hostname("127.0.0.1")
                    .port(23306)
                    .databaseList("test") // set captured database
                    .tableList("test\\..*")// set captured table
                    .username("root")
                    .password("123456")
                    .debeziumProperties(DateToStringConverter.DEFAULT_PROPS)
                    .deserializer(schema)
                    .serverTimeZone("Asia/Shanghai")
                    .includeSchemaChanges(true) // converts SourceRecord to JSON String
                    .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 设置 3s 的 checkpoint 间隔
    env.enableCheckpointing(3000);

    DataStream<String> cdcStream = env
            .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
            .setParallelism(2);

    // 3. 文件系统 Sink
    FileSink<String> fileSink = FileSink
            .forRowFormat(
                    new Path("D:/data/mysql_cdc/"),          // 输出路径
                    new SimpleStringEncoder<String>("UTF-8"))
            .withRollingPolicy(
                    DefaultRollingPolicy.builder()
                            .withRolloverInterval(Duration.ofMinutes(1))   // 每分钟滚动
                            .withInactivityInterval(Duration.ofSeconds(30)) // 空闲超过30秒滚动
                            .withMaxPartSize(1024 * 1024 * 128)            // 128MB
                            .build()
            )
            .build();

    // 4. 写入文件
    cdcStream.sinkTo(fileSink).setParallelism(1);

    env.execute("MySQL CDC to File");

2、写入doris
public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 开启 checkpoint
    env.enableCheckpointing(10000);

    // ----------------------
    // 1. FileSource 读取 MySQL CDC JSON 文件
    // ----------------------
    FileSource<String> fileSource = FileSource
            .forRecordStreamFormat(new TextLineInputFormat(), new Path("D:/data/mysql_cdc/"))
            .monitorContinuously(Duration.ofSeconds(5)) // 每5秒扫描一次新文件
            .build();

    DataStream<String> fileStream = env.fromSource(
            fileSource,
            WatermarkStrategy.noWatermarks(),
            "File Source"
    );

    // ----------------------
    // 2. Doris Sink 配置
    // ----------------------
    Properties props = new Properties();
    props.setProperty("format", "json");
    props.setProperty("read_json_by_line", "true");

    DorisOptions dorisOptions = DorisOptions.builder()
            .setFenodes("127.0.0.1:8030")
            .setTableIdentifier("")
            .setUsername("root")
            .setPassword("123456")
            .build();

    DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
    executionBuilder
            .setLabelPrefix("label-doris" + UUID.randomUUID())
            .setStreamLoadProp(props)
            .setDeletable(true);

    DorisSink.Builder<String> builder = DorisSink.builder();
    builder.setDorisReadOptions(DorisReadOptions.builder().build())
            .setDorisExecutionOptions(executionBuilder.build())
            .setDorisOptions(dorisOptions)
            .setSerializer(
                    JsonDebeziumSchemaSerializer.builder()
                            .setDorisOptions(dorisOptions)
                            .setNewSchemaChange(true)
                            .build());

    // ----------------------
    // 3. 写入 Doris
    // ----------------------
    fileStream/*.map(new DebeziumToDorisJsonMapper())*/.sinkTo(builder.build());

    env.execute("File to Doris");
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions