From 6d44826e10d27ec09d7604058bda959413284937 Mon Sep 17 00:00:00 2001 From: dailai Date: Thu, 7 Mar 2024 17:09:44 +0800 Subject: [PATCH] [Improve][Paimon] PaimonTypeMapper implements TypeConverter --- .../paimon/data/PaimonTypeMapper.java | 169 +++++++++++++++--- .../seatunnel/paimon/utils/SchemaUtil.java | 7 +- 2 files changed, 150 insertions(+), 26 deletions(-) diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java index 5736a47bdcbe..346c0352faf0 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java @@ -17,22 +17,48 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.data; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.converter.TypeConverter; import org.apache.seatunnel.api.table.type.ArrayType; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonError; +import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.TimeType; + +import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; import java.util.concurrent.atomic.AtomicInteger; -public class PaimonTypeMapper { +@Slf4j +@AutoService(TypeConverter.class) +public class PaimonTypeMapper implements TypeConverter { private static final AtomicInteger fieldId = new AtomicInteger(-1); + public static final PaimonTypeMapper INSTANCE = new PaimonTypeMapper(); + + @Override + public String identifier() { + return PaimonSink.PLUGIN_NAME; + } - public static DataType toPaimonType(SeaTunnelDataType dataType) { - switch (dataType.getSqlType()) { + @Override + public Column convert(DataType typeDefine) { + // todo compelete when need + return null; + } + + @Override + public DataType reconvert(Column column) { + SeaTunnelDataType seaTunnelDataType = column.getDataType(); + switch (seaTunnelDataType.getSqlType()) { case BOOLEAN: return DataTypes.BOOLEAN(); case BYTES: @@ -50,38 +76,137 @@ public static DataType toPaimonType(SeaTunnelDataType dataType) { case DOUBLE: return DataTypes.DOUBLE(); case DECIMAL: - DecimalType decimalType = (DecimalType) dataType; - return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale()); + DecimalType decimalType = (DecimalType) seaTunnelDataType; + long precision = decimalType.getPrecision(); + int scale = decimalType.getScale(); + if (precision < org.apache.paimon.types.DecimalType.MIN_PRECISION) { + precision = org.apache.paimon.types.DecimalType.DEFAULT_PRECISION; + scale = org.apache.paimon.types.DecimalType.DEFAULT_SCALE; + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which is precision less than 0, " + + "it will be converted to decimal({},{})", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + precision, + scale); + } else if (precision > org.apache.paimon.types.DecimalType.MAX_PRECISION) { + scale = + (int) + Math.max( + 0, + scale + - (precision + - org.apache.paimon.types.DecimalType + .MAX_PRECISION)); + precision = org.apache.paimon.types.DecimalType.MAX_PRECISION; + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which exceeds the maximum precision of {}, " + + "it will be converted to decimal({},{})", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + org.apache.paimon.types.DecimalType.MAX_PRECISION, + precision, + scale); + } + if (scale < org.apache.paimon.types.DecimalType.MIN_SCALE) { + scale = org.apache.paimon.types.DecimalType.MIN_SCALE; + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which is scale less than 0, " + + "it will be converted to decimal({},{})", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + precision, + scale); + } else if (scale > precision) { + scale = (int) precision; + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which exceeds the maximum scale of {}, " + + "it will be converted to decimal({},{})", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + precision, + precision, + scale); + } + return DataTypes.DECIMAL((int) precision, scale); + case DATE: + return DataTypes.DATE(); + case TIME: + Integer timeScale = column.getScale(); + if (timeScale != null && timeScale > TimeType.MAX_PRECISION) { + timeScale = TimeType.MAX_PRECISION; + log.warn( + "The time column {} type time({}) is out of range, " + + "which exceeds the maximum scale of {}, " + + "it will be converted to time({})", + column.getName(), + column.getScale(), + TimeType.MAX_PRECISION, + timeScale); + } + return DataTypes.TIME(timeScale); + case TIMESTAMP: + Integer timestampScale = column.getScale(); + if (timestampScale != null + && timestampScale > LocalZonedTimestampType.MAX_PRECISION) { + timestampScale = LocalZonedTimestampType.MAX_PRECISION; + log.warn( + "The timestamp column {} type timestamp({}) is out of range, " + + "which exceeds the maximum scale of {}, " + + "it will be converted to timestamp({})", + column.getName(), + column.getScale(), + LocalZonedTimestampType.MAX_PRECISION, + timestampScale); + } + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(timestampScale); + case STRING: + return DataTypes.STRING(); case ARRAY: - ArrayType arrayType = (ArrayType) dataType; - // converter elementType - DataType elementType = toPaimonType(arrayType.getElementType()); + ArrayType arrayType = (ArrayType) seaTunnelDataType; + DataType elementType = + reconvert(getPhysicalColumn(column, arrayType.getElementType())); return DataTypes.ARRAY(elementType); case MAP: org.apache.seatunnel.api.table.type.MapType mapType = - (org.apache.seatunnel.api.table.type.MapType) dataType; - DataType keyType = toPaimonType(mapType.getKeyType()); - DataType valueType = toPaimonType(mapType.getValueType()); - return DataTypes.MAP(keyType, valueType); + (org.apache.seatunnel.api.table.type.MapType) seaTunnelDataType; + return DataTypes.MAP( + reconvert(getPhysicalColumn(column, mapType.getKeyType())), + reconvert(getPhysicalColumn(column, mapType.getValueType()))); case ROW: - SeaTunnelRowType seaTunnelRowType = (SeaTunnelRowType) dataType; + SeaTunnelRowType seaTunnelRowType = (SeaTunnelRowType) seaTunnelDataType; DataField[] dataFields = new DataField[seaTunnelRowType.getTotalFields()]; for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) { String field = seaTunnelRowType.getFieldName(i); SeaTunnelDataType fieldType = seaTunnelRowType.getFieldType(i); int id = fieldId.incrementAndGet(); - dataFields[i] = new DataField(id, field, toPaimonType(fieldType)); + dataFields[i] = + new DataField( + id, field, reconvert(getPhysicalColumn(column, fieldType))); } return DataTypes.ROW(dataFields); - case DATE: - return DataTypes.DATE(); - case TIME: - return DataTypes.TIME(); - case TIMESTAMP: - return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(); - case STRING: default: - return DataTypes.STRING(); + throw CommonError.convertToConnectorTypeError( + identifier(), column.getDataType().getSqlType().name(), column.getName()); } } + + private PhysicalColumn getPhysicalColumn( + Column column, SeaTunnelDataType seaTunnelDataType) { + return PhysicalColumn.of( + column.getName(), + seaTunnelDataType, + 0, + true, + column.getDefaultValue(), + column.getComment()); + } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java index a05f8e768be8..aaa0f7407f99 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java @@ -20,7 +20,6 @@ import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.PrimaryKey; import org.apache.seatunnel.api.table.catalog.TableSchema; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.connectors.seatunnel.paimon.data.PaimonTypeMapper; import org.apache.paimon.schema.Schema; @@ -31,15 +30,15 @@ /** The util seatunnel schema to paimon schema */ public class SchemaUtil { - public static DataType toPaimonType(SeaTunnelDataType rowType) { - return PaimonTypeMapper.toPaimonType(rowType); + public static DataType toPaimonType(Column column) { + return PaimonTypeMapper.INSTANCE.reconvert(column); } public static Schema toPaimonSchema(TableSchema tableSchema) { Schema.Builder paiSchemaBuilder = Schema.newBuilder(); for (int i = 0; i < tableSchema.getColumns().size(); i++) { Column column = tableSchema.getColumns().get(i); - paiSchemaBuilder.column(column.getName(), toPaimonType(column.getDataType())); + paiSchemaBuilder.column(column.getName(), toPaimonType(column)); } PrimaryKey primaryKey = tableSchema.getPrimaryKey(); if (Objects.nonNull(primaryKey) && primaryKey.getColumnNames().size() > 0) {