-
Notifications
You must be signed in to change notification settings - Fork 251
Description
Search before asking
- I had searched in the issues and found no similar issues.
Version
com.zhongan.datacenter flink-doris-connector-1.17 25.0.0What's Wrong?
flink streamload 写doris时,会小概率发生以下问题,在出现no available backend的报错的时候,flink运行过程中添加的字段会出现以下2种情况
-
无法同步新增加的字段
-
已有字段会错列
return MySqlSource.<String>builder() .hostname(hostname) .port(port) .databaseList(database) .tableList(tableList) .username(username) .password(password) .serverTimeZone("Asia/Shanghai") .deserializer(new JsonDebeziumDeserializationSchema(false, customConverterConfigs)) .startupOptions(startupOptions.equalsIgnoreCase("timestamp") ? StartupOptions.timestamp(parameter.getLong("oceanbase.cdc.scan.startup.timestamp.ms")) : StartupOptions.initial()) .serverId(generateServerId(globalParallelism)) // .includeSchemaChanges(true) // 配置获取DDL事件的参数 .debeziumProperties(properties) .build();
2025-07-17 11:19:33.429 日志 [Data Collapse Processor -> Map -> Doris Sink By StreamLoad: Writer -> Doris Sink By StreamLoad: Committer (1/1)#0] WARN org.apache.flink.runtime.taskmanager.Task - Data Collapse Processor -> Map -> Doris Sink By StreamLoad: Writer -> Doris Sink By StreamLoad: Committer (1/1)#0 (7c87033a16b1e408531a41f7a6d65bf8_e0d9683bb400b8c0fc363829cca7469b_0_0) switched from RUNNING to FAILED with failure cause:
java.io.IOException: Could not perform checkpoint 248 for operator Data Collapse Processor -> Map -> Doris Sink By StreamLoad: Writer -> Doris Sink By StreamLoad: Committer (1/1)#0.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1256)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:68)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Writing records to streamload failed.
at org.apache.doris.flink.sink.batch.DorisBatchWriter.checkFlushException(DorisBatchWriter.java:177)
at org.apache.doris.flink.sink.batch.DorisBatchWriter.flush(DorisBatchWriter.java:133)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.prepareSnapshotPreBarrier(SinkWriterOperator.java:165)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.prepareSnapshotPreBarrier(RegularOperatorChain.java:89)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:321)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1299)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1287)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1244)
... 21 common frames omitted
Caused by: org.apache.doris.flink.exception.DorisBatchLoadException: org.apache.doris.flink.exception.DorisRuntimeException: no available backend.
at org.apache.doris.flink.sink.batch.DorisBatchStreamLoad.checkFlushException(DorisBatchStreamLoad.java:326)
at org.apache.doris.flink.sink.batch.DorisBatchStreamLoad.doFlush(DorisBatchStreamLoad.java:251)
at org.apache.doris.flink.sink.batch.DorisBatchStreamLoad.intervalFlush(DorisBatchStreamLoad.java:242)
at org.apache.doris.flink.sink.batch.DorisBatchWriter.intervalFlush(DorisBatchWriter.java:118)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 common frames omitted
Caused by: org.apache.doris.flink.exception.DorisRuntimeException: no available backend.
at org.apache.doris.flink.sink.BackendUtil.getAvailableBackend(BackendUtil.java:96)
at org.apache.doris.flink.sink.batch.DorisBatchStreamLoad$LoadAsyncExecutor.refreshLoadUrl(DorisBatchStreamLoad.java:567)
at org.apache.doris.flink.sink.batch.DorisBatchStreamLoad$LoadAsyncExecutor.load(DorisBatchStreamLoad.java:461)
at org.apache.doris.flink.sink.batch.DorisBatchStreamLoad$LoadAsyncExecutor.run(DorisBatchStreamLoad.java:441)
... 3 common frames omitted
What You Expected?
- 发现FE挂掉会导致no available backend,期间报错时间段内的日志,fe.log 和 fe.warn.log 并无日志
- 错列和无法同步新加字段是否和 includeSchemaChanges(true) 注释掉有关系
How to Reproduce?
No response
Anything Else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct