flink partial update failed #51717
Replies: 1 comment
-
参数设置错误,ok了 |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
使用flink通过flink-connector-doris的rowdata写入doris 报错,flink是1.19.1版本。doris是2.1.6版本。flink-doris-connector 版本是1.19-24.0.0.
建表语句如下,
CREATE TABLE tt.aa (
type int NOT NULL COMMENT '数据来源',
sensor_id varchar(64) NOT NULL COMMENT '车辆id',
latest_time datetime NOT NULL COMMENT '上报时间 yyyy-MM-dd HH:mm:ss',
data_code varchar(64) NULL COMMENT '产品编码',
bike_status int NULL COMMENT '车辆运行',
order_id varchar(64) NULL COMMENT '订单编号',
ws_lat float NULL COMMENT '纬度',
ws_lng float NULL COMMENT '经度',
ws_type int DEFAULT '2' COMMENT '使用的坐标'
)
UNIQUE KEY(type,sensor_id,latest_time)
COMMENT '单车定位信息表'
PARTITION BY RANGE(latest_time)()
DISTRIBUTED BY HASH(type,sensor_id) BUCKETS 4
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"dynamic_partition.enable" = "true",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.time_unit" = "HOUR",
"dynamic_partition.start" = "-480",
"dynamic_partition.end" = "12",
"dynamic_partition.prefix" = "p"
);
rowdata的配置是
import java.sql.Timestamp;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
public class BPRDMF implements MapFunction<BP, RowData> {
}
flink的doris的sink配置是
String[] fields=new String[] {
"type",
"sensor_id",
"latest_time",
"data_code",
"bike_status",
"order_id",
"ws_lat",
"ws_lng"
};
DataType[] data_types=new DataType[] {
DataTypes.TINYINT(),
DataTypes.VARCHAR(16),
DataTypes.TIMESTAMP(),
DataTypes.VARCHAR(16),
DataTypes.TINYINT(),
DataTypes.VARCHAR(48),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
}
省略positionStream的生成过程;positionStream正确无误。
实际执行过程中,参数都配置正确,语法配置均无错误。
Process one query failed because.
org.apache.doris.common.AnalysisException: errCode = 2, detailMessage = Syntax error in line 8008:
ON DUPLICATE KEY UPDATE
^
Encountered: ON
Expected: COMMA
at org.apache.doris.qe.ConnectProcessor.parse(ConnectProcessor.java:493) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.qe.ConnectProcessor.executeQuery(ConnectProcessor.java:274) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.qe.ConnectProcessor.handleQuery(ConnectProcessor.java:206) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.qe.MysqlConnectProcessor.handleQuery(MysqlConnectProcessor.java:272) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.qe.MysqlConnectProcessor.dispatch(MysqlConnectProcessor.java:300) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.qe.MysqlConnectProcessor.processOnce(MysqlConnectProcessor.java:358) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.mysql.ReadListener.lambda$handleEvent$0(ReadListener.java:52) ~[doris-fe.jar:1.2-SNAPSHOT]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_352-352]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_352-352]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_352-352]
Caused by: org.apache.doris.common.AnalysisException: errCode = 2, detailMessage = Syntax error
at org.apache.doris.analysis.SqlParser.unrecovered_syntax_error(SqlParser.java:2988) ~[doris-fe.jar:1.2-SNAPSHOT]
at java_cup.runtime.lr_parser.parse(lr_parser.java:616) ~[jflex-1.4.3.jar:?]
at org.apache.doris.common.util.SqlParserUtils.getMultiStmts(SqlParserUtils.java:60) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.qe.ConnectProcessor.parse(ConnectProcessor.java:482) ~[doris-fe.jar:1.2-SNAPSHOT]
... 9 more
Beta Was this translation helpful? Give feedback.
All reactions