You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
-
使用flink通过flink-connector-doris的rowdata写入doris,flink是1.19.1版本。doris是2.1.6版本。flink-doris-connector 版本是1.19-24.0.0.
建表语句如下,
CREATE TABLE tt.aa (
type int NOT NULL COMMENT '数据来源',
sensor_id varchar(64) NOT NULL COMMENT '车辆id',
latest_time datetime NOT NULL COMMENT '上报时间 yyyy-MM-dd HH:mm:ss',
data_code varchar(64) NULL COMMENT '产品编码',
bike_status int NULL COMMENT '车辆运行',
order_id varchar(64) NULL COMMENT '订单编号',
ws_lat float NULL COMMENT '纬度',
ws_lng float NULL COMMENT '经度',
ws_type int DEFAULT '2' COMMENT '使用的坐标'
)
UNIQUE KEY(type,sensor_id,latest_time)
COMMENT '单车定位信息表'
PARTITION BY RANGE(latest_time)()
DISTRIBUTED BY HASH(type,sensor_id) BUCKETS 4
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"dynamic_partition.enable" = "true",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.time_unit" = "HOUR",
"dynamic_partition.start" = "-480",
"dynamic_partition.end" = "12",
"dynamic_partition.prefix" = "p"
);
rowdata的配置是
import java.sql.Timestamp;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
public class BPRDMF implements MapFunction<BP, RowData> {
}
flink的doris的sink配置是
String[] fields=new String[] {
"type",
"sensor_id",
"latest_time",
"data_code",
"bike_status",
"order_id",
"ws_lat",
"ws_lng"
};
DataType[] data_types=new DataType[] {
DataTypes.TINYINT(),
DataTypes.VARCHAR(16),
DataTypes.TIMESTAMP(),
DataTypes.VARCHAR(16),
DataTypes.TINYINT(),
DataTypes.VARCHAR(48),
DataTypes.DOUBLE(),
DataTypes.DOUBLE(),
}
Reason: actual column number in csv file is less than schema column number.actual number: 8, schema column number: 9; line delimiter:
如果将csv的配置去掉,修改成JSON格式的,可以正确进行部分列更新。
properties.setProperty("format", "json");
properties.setProperty("read_json_by_line", "true");
RowDataSerializer.builder().setType(LoadConstants.JSON)
Beta Was this translation helpful? Give feedback.
All reactions