Skip to content

Commit

Permalink
[Improve][Paimon] PaimonTypeMapper implements TypeConverter
Browse files Browse the repository at this point in the history
  • Loading branch information
dailai committed Mar 7, 2024
1 parent d1925e8 commit 6d44826
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,48 @@

package org.apache.seatunnel.connectors.seatunnel.paimon.data;

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.converter.TypeConverter;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.PaimonSink;

import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.TimeType;

import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicInteger;

public class PaimonTypeMapper {
@Slf4j
@AutoService(TypeConverter.class)
public class PaimonTypeMapper implements TypeConverter<DataType> {
private static final AtomicInteger fieldId = new AtomicInteger(-1);
public static final PaimonTypeMapper INSTANCE = new PaimonTypeMapper();

@Override
public String identifier() {
return PaimonSink.PLUGIN_NAME;
}

public static DataType toPaimonType(SeaTunnelDataType dataType) {
switch (dataType.getSqlType()) {
@Override
public Column convert(DataType typeDefine) {
// todo compelete when need
return null;
}

@Override
public DataType reconvert(Column column) {
SeaTunnelDataType<?> seaTunnelDataType = column.getDataType();
switch (seaTunnelDataType.getSqlType()) {
case BOOLEAN:
return DataTypes.BOOLEAN();
case BYTES:
Expand All @@ -50,38 +76,137 @@ public static DataType toPaimonType(SeaTunnelDataType dataType) {
case DOUBLE:
return DataTypes.DOUBLE();
case DECIMAL:
DecimalType decimalType = (DecimalType) dataType;
return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale());
DecimalType decimalType = (DecimalType) seaTunnelDataType;
long precision = decimalType.getPrecision();
int scale = decimalType.getScale();
if (precision < org.apache.paimon.types.DecimalType.MIN_PRECISION) {
precision = org.apache.paimon.types.DecimalType.DEFAULT_PRECISION;
scale = org.apache.paimon.types.DecimalType.DEFAULT_SCALE;
log.warn(
"The decimal column {} type decimal({},{}) is out of range, "
+ "which is precision less than 0, "
+ "it will be converted to decimal({},{})",
column.getName(),
decimalType.getPrecision(),
decimalType.getScale(),
precision,
scale);
} else if (precision > org.apache.paimon.types.DecimalType.MAX_PRECISION) {
scale =
(int)
Math.max(
0,
scale
- (precision
- org.apache.paimon.types.DecimalType
.MAX_PRECISION));
precision = org.apache.paimon.types.DecimalType.MAX_PRECISION;
log.warn(
"The decimal column {} type decimal({},{}) is out of range, "
+ "which exceeds the maximum precision of {}, "
+ "it will be converted to decimal({},{})",
column.getName(),
decimalType.getPrecision(),
decimalType.getScale(),
org.apache.paimon.types.DecimalType.MAX_PRECISION,
precision,
scale);
}
if (scale < org.apache.paimon.types.DecimalType.MIN_SCALE) {
scale = org.apache.paimon.types.DecimalType.MIN_SCALE;
log.warn(
"The decimal column {} type decimal({},{}) is out of range, "
+ "which is scale less than 0, "
+ "it will be converted to decimal({},{})",
column.getName(),
decimalType.getPrecision(),
decimalType.getScale(),
precision,
scale);
} else if (scale > precision) {
scale = (int) precision;
log.warn(
"The decimal column {} type decimal({},{}) is out of range, "
+ "which exceeds the maximum scale of {}, "
+ "it will be converted to decimal({},{})",
column.getName(),
decimalType.getPrecision(),
decimalType.getScale(),
precision,
precision,
scale);
}
return DataTypes.DECIMAL((int) precision, scale);
case DATE:
return DataTypes.DATE();
case TIME:
Integer timeScale = column.getScale();
if (timeScale != null && timeScale > TimeType.MAX_PRECISION) {
timeScale = TimeType.MAX_PRECISION;
log.warn(
"The time column {} type time({}) is out of range, "
+ "which exceeds the maximum scale of {}, "
+ "it will be converted to time({})",
column.getName(),
column.getScale(),
TimeType.MAX_PRECISION,
timeScale);
}
return DataTypes.TIME(timeScale);
case TIMESTAMP:
Integer timestampScale = column.getScale();
if (timestampScale != null
&& timestampScale > LocalZonedTimestampType.MAX_PRECISION) {
timestampScale = LocalZonedTimestampType.MAX_PRECISION;
log.warn(
"The timestamp column {} type timestamp({}) is out of range, "
+ "which exceeds the maximum scale of {}, "
+ "it will be converted to timestamp({})",
column.getName(),
column.getScale(),
LocalZonedTimestampType.MAX_PRECISION,
timestampScale);
}
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(timestampScale);
case STRING:
return DataTypes.STRING();
case ARRAY:
ArrayType arrayType = (ArrayType) dataType;
// converter elementType
DataType elementType = toPaimonType(arrayType.getElementType());
ArrayType arrayType = (ArrayType) seaTunnelDataType;
DataType elementType =
reconvert(getPhysicalColumn(column, arrayType.getElementType()));
return DataTypes.ARRAY(elementType);
case MAP:
org.apache.seatunnel.api.table.type.MapType mapType =
(org.apache.seatunnel.api.table.type.MapType) dataType;
DataType keyType = toPaimonType(mapType.getKeyType());
DataType valueType = toPaimonType(mapType.getValueType());
return DataTypes.MAP(keyType, valueType);
(org.apache.seatunnel.api.table.type.MapType) seaTunnelDataType;
return DataTypes.MAP(
reconvert(getPhysicalColumn(column, mapType.getKeyType())),
reconvert(getPhysicalColumn(column, mapType.getValueType())));
case ROW:
SeaTunnelRowType seaTunnelRowType = (SeaTunnelRowType) dataType;
SeaTunnelRowType seaTunnelRowType = (SeaTunnelRowType) seaTunnelDataType;
DataField[] dataFields = new DataField[seaTunnelRowType.getTotalFields()];
for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) {
String field = seaTunnelRowType.getFieldName(i);
SeaTunnelDataType fieldType = seaTunnelRowType.getFieldType(i);
int id = fieldId.incrementAndGet();
dataFields[i] = new DataField(id, field, toPaimonType(fieldType));
dataFields[i] =
new DataField(
id, field, reconvert(getPhysicalColumn(column, fieldType)));
}
return DataTypes.ROW(dataFields);
case DATE:
return DataTypes.DATE();
case TIME:
return DataTypes.TIME();
case TIMESTAMP:
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
case STRING:
default:
return DataTypes.STRING();
throw CommonError.convertToConnectorTypeError(
identifier(), column.getDataType().getSqlType().name(), column.getName());
}
}

private PhysicalColumn getPhysicalColumn(
Column column, SeaTunnelDataType<?> seaTunnelDataType) {
return PhysicalColumn.of(
column.getName(),
seaTunnelDataType,
0,
true,
column.getDefaultValue(),
column.getComment());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.paimon.data.PaimonTypeMapper;

import org.apache.paimon.schema.Schema;
Expand All @@ -31,15 +30,15 @@
/** The util seatunnel schema to paimon schema */
public class SchemaUtil {

public static DataType toPaimonType(SeaTunnelDataType rowType) {
return PaimonTypeMapper.toPaimonType(rowType);
public static DataType toPaimonType(Column column) {
return PaimonTypeMapper.INSTANCE.reconvert(column);
}

public static Schema toPaimonSchema(TableSchema tableSchema) {
Schema.Builder paiSchemaBuilder = Schema.newBuilder();
for (int i = 0; i < tableSchema.getColumns().size(); i++) {
Column column = tableSchema.getColumns().get(i);
paiSchemaBuilder.column(column.getName(), toPaimonType(column.getDataType()));
paiSchemaBuilder.column(column.getName(), toPaimonType(column));
}
PrimaryKey primaryKey = tableSchema.getPrimaryKey();
if (Objects.nonNull(primaryKey) && primaryKey.getColumnNames().size() > 0) {
Expand Down

0 comments on commit 6d44826

Please sign in to comment.