From 5b324883148389b5fd57c279cd4bcb89b0e5731b Mon Sep 17 00:00:00 2001 From: dailai Date: Fri, 8 Mar 2024 09:47:18 +0800 Subject: [PATCH] [Feature][Paimon] paimon sink support savemode --- .../paimon/catalog/PaimonCatalog.java | 215 ++++++++++++++++++ .../paimon/catalog/PaimonCatalogFactory.java | 54 +++++ .../paimon/catalog/PaimonCatalogLoader.java | 61 +++++ .../seatunnel/paimon/catalog/PaimonTable.java | 28 +++ .../seatunnel/paimon/config/PaimonConfig.java | 9 +- .../paimon/config/PaimonSinkConfig.java | 66 ++++++ .../paimon/data/PaimonTypeMapper.java | 123 +++++++++- .../paimon/handler/PaimonSaveModeHandler.java | 58 +++++ .../seatunnel/paimon/sink/PaimonSink.java | 112 ++++----- .../paimon/sink/PaimonSinkFactory.java | 2 +- .../paimon/sink/SupportLoadTable.java | 22 ++ .../seatunnel/paimon/utils/SchemaUtil.java | 5 + 12 files changed, 698 insertions(+), 57 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java create mode 100644 seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java create mode 100644 seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java create mode 100644 seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonTable.java create mode 100644 seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java create mode 100644 seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java create mode 100644 seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java new file mode 100644 index 00000000000..7312ed28b06 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.catalog; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.types.DataField; + +import lombok.extern.slf4j.Slf4j; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +@Slf4j +public class PaimonCatalog implements Catalog, PaimonTable { + private static final String DEFAULT_DATABASE = "default"; + + private String catalogName; + private ReadonlyConfig readonlyConfig; + private PaimonCatalogLoader paimonCatalogLoader; + private org.apache.paimon.catalog.Catalog catalog; + + public PaimonCatalog(String catalogName, ReadonlyConfig readonlyConfig) { + this.readonlyConfig = readonlyConfig; + this.catalogName = catalogName; + this.paimonCatalogLoader = new PaimonCatalogLoader(new PaimonSinkConfig(readonlyConfig)); + } + + @Override + public void open() throws CatalogException { + this.catalog = paimonCatalogLoader.loadCatalog(); + } + + @Override + public void close() throws CatalogException { + if (catalog != null && catalog instanceof Closeable) { + try { + ((Closeable) catalog).close(); + } catch (IOException e) { + log.error("Error while closing IcebergCatalog.", e); + throw new CatalogException(e); + } + } + } + + @Override + public String name() { + return this.catalogName; + } + + @Override + public String getDefaultDatabase() throws CatalogException { + return DEFAULT_DATABASE; + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + return catalog.databaseExists(databaseName); + } + + @Override + public List listDatabases() throws CatalogException { + return catalog.listDatabases(); + } + + @Override + public List listTables(String databaseName) + throws CatalogException, DatabaseNotExistException { + try { + return catalog.listTables(databaseName); + } catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException e) { + throw new DatabaseNotExistException(this.catalogName, databaseName); + } + } + + @Override + public boolean tableExists(TablePath tablePath) throws CatalogException { + return catalog.tableExists(toIdentifier(tablePath)); + } + + @Override + public CatalogTable getTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + try { + FileStoreTable paimonFileStoreTableTable = (FileStoreTable) getPaimonTable(tablePath); + return toCatalogTable(paimonFileStoreTableTable, tablePath); + } catch (Exception e) { + throw new TableNotExistException(this.catalogName, tablePath); + } + } + + @Override + public Table getPaimonTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + try { + return catalog.getTable(toIdentifier(tablePath)); + } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) { + throw new TableNotExistException(this.catalogName, tablePath); + } + } + + @Override + public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + try { + catalog.createTable( + toIdentifier(tablePath), + SchemaUtil.toPaimonSchema(table.getTableSchema()), + ignoreIfExists); + } catch (org.apache.paimon.catalog.Catalog.TableAlreadyExistException e) { + throw new TableAlreadyExistException(this.catalogName, tablePath); + } catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException e) { + throw new DatabaseNotExistException(this.catalogName, tablePath.getDatabaseName()); + } + } + + @Override + public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + try { + catalog.dropTable(toIdentifier(tablePath), ignoreIfNotExists); + } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) { + throw new TableNotExistException(this.catalogName, tablePath); + } + } + + @Override + public void createDatabase(TablePath tablePath, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + try { + catalog.createDatabase(tablePath.getDatabaseName(), ignoreIfExists); + } catch (org.apache.paimon.catalog.Catalog.DatabaseAlreadyExistException e) { + throw new DatabaseAlreadyExistException(this.catalogName, tablePath.getDatabaseName()); + } + } + + @Override + public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + try { + catalog.dropDatabase(tablePath.getDatabaseName(), ignoreIfNotExists, true); + } catch (Exception e) { + throw new DatabaseNotExistException(this.catalogName, tablePath.getDatabaseName()); + } + } + + private CatalogTable toCatalogTable( + FileStoreTable paimonFileStoreTableTable, TablePath tablePath) { + org.apache.paimon.schema.TableSchema schema = paimonFileStoreTableTable.schema(); + List dataFields = schema.fields(); + TableSchema.Builder builder = TableSchema.builder(); + dataFields.forEach( + dataField -> { + String name = dataField.name(); + SeaTunnelDataType seaTunnelType = + SchemaUtil.toSeaTunnelType(dataField.type()); + PhysicalColumn physicalColumn = + PhysicalColumn.of( + name, + seaTunnelType, + (Long) null, + true, + null, + dataField.description()); + builder.column(physicalColumn); + }); + + List partitionKeys = schema.partitionKeys(); + + return CatalogTable.of( + org.apache.seatunnel.api.table.catalog.TableIdentifier.of( + catalogName, tablePath.getDatabaseName(), tablePath.getTableName()), + builder.build(), + paimonFileStoreTableTable.options(), + partitionKeys, + null, + catalogName); + } + + private Identifier toIdentifier(TablePath tablePath) { + return Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName()); + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java new file mode 100644 index 00000000000..4d94f385d9f --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.catalog; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.factory.CatalogFactory; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class PaimonCatalogFactory implements CatalogFactory { + @Override + public Catalog createCatalog(String catalogName, ReadonlyConfig readonlyConfig) { + return new PaimonCatalog(catalogName, readonlyConfig); + } + + @Override + public String factoryIdentifier() { + return "Paimon"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required( + PaimonSinkConfig.WAREHOUSE, + PaimonSinkConfig.DATABASE, + PaimonSinkConfig.TABLE) + .optional( + PaimonSinkConfig.HDFS_SITE_PATH, + PaimonSinkConfig.SCHEMA_SAVE_MODE, + PaimonSinkConfig.DATA_SAVE_MODE) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java new file mode 100644 index 00000000000..bec66dbe3f2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.catalog; + +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.options.Options; + +import lombok.extern.slf4j.Slf4j; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.WAREHOUSE; + +@Slf4j +public class PaimonCatalogLoader implements Serializable { + private PaimonSinkConfig config; + + public PaimonCatalogLoader(PaimonSinkConfig config) { + this.config = config; + } + + public Catalog loadCatalog() { + // When using the seatunel engine, set the current class loader to prevent loading failures + Thread.currentThread().setContextClassLoader(PaimonCatalogLoader.class.getClassLoader()); + final String warehouse = config.getWarehouse(); + final Map optionsMap = new HashMap<>(); + optionsMap.put(WAREHOUSE.key(), warehouse); + final Options options = Options.fromMap(optionsMap); + final Configuration hadoopConf = new Configuration(); + String hdfsSitePathOptional = config.getHdfsSitePath(); + if (StringUtils.isNotBlank(hdfsSitePathOptional)) { + hadoopConf.addResource(new Path(hdfsSitePathOptional)); + } + final CatalogContext catalogContext = CatalogContext.create(options, hadoopConf); + return CatalogFactory.createCatalog(catalogContext); + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonTable.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonTable.java new file mode 100644 index 00000000000..55b18f79ab3 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonTable.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.catalog; + +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; + +import org.apache.paimon.table.Table; + +public interface PaimonTable { + Table getPaimonTable(TablePath tablePath) throws CatalogException, TableNotExistException; +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java index b5299d87559..0396e6223af 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java @@ -22,13 +22,14 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.source.SeaTunnelSource; +import java.io.Serializable; import java.util.List; /** * Utility class to store configuration options, used by {@link SeaTunnelSource} and {@link * SeaTunnelSink}. */ -public class PaimonConfig { +public class PaimonConfig implements Serializable { public static final Option WAREHOUSE = Options.key("warehouse") @@ -36,6 +37,12 @@ public class PaimonConfig { .noDefaultValue() .withDescription("The warehouse path of paimon"); + public static final Option CATALOG_NAME = + Options.key("catalog_name") + .stringType() + .defaultValue("paimon") + .withDescription(" the iceberg catalog name"); + public static final Option DATABASE = Options.key("database") .stringType() diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java new file mode 100644 index 00000000000..589fd948167 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.sink.DataSaveMode; +import org.apache.seatunnel.api.sink.SchemaSaveMode; + +import lombok.Getter; + +import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull; + +@Getter +public class PaimonSinkConfig extends PaimonConfig { + public static final Option SCHEMA_SAVE_MODE = + Options.key("schema_save_mode") + .enumType(SchemaSaveMode.class) + .defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST) + .withDescription("schema_save_mode"); + + public static final Option DATA_SAVE_MODE = + Options.key("data_save_mode") + .enumType(DataSaveMode.class) + .defaultValue(DataSaveMode.APPEND_DATA) + .withDescription("data_save_mode"); + + private String catalogName; + private String warehouse; + private String namespace; + private String table; + private String hdfsSitePath; + private SchemaSaveMode schemaSaveMode; + private DataSaveMode dataSaveMode; + + public PaimonSinkConfig(ReadonlyConfig readonlyConfig) { + this.catalogName = checkArgumentNotNull(readonlyConfig.get(CATALOG_NAME)); + this.warehouse = checkArgumentNotNull(readonlyConfig.get(WAREHOUSE)); + this.namespace = checkArgumentNotNull(readonlyConfig.get(DATABASE)); + this.table = checkArgumentNotNull(readonlyConfig.get(TABLE)); + this.hdfsSitePath = readonlyConfig.get(HDFS_SITE_PATH); + this.schemaSaveMode = readonlyConfig.get(SCHEMA_SAVE_MODE); + this.dataSaveMode = readonlyConfig.get(DATA_SAVE_MODE); + } + + protected T checkArgumentNotNull(T argument) { + checkNotNull(argument); + return argument; + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java index e0612ae5f22..b6aaf8bd1e8 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/data/PaimonTypeMapper.java @@ -21,7 +21,10 @@ import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.converter.TypeConverter; import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonError; @@ -31,11 +34,14 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; import org.apache.paimon.types.TimeType; import com.google.auto.service.AutoService; import lombok.extern.slf4j.Slf4j; +import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; @@ -51,9 +57,118 @@ public String identifier() { } @Override - public Column convert(DataType typeDefine) { - // todo compelete when need - return null; + public Column convert(DataType dataType) { + SeaTunnelDataType seaTunnelDataType; + switch (dataType.getTypeRoot()) { + case CHAR: + case VARCHAR: + seaTunnelDataType = BasicType.STRING_TYPE; + break; + case BOOLEAN: + seaTunnelDataType = BasicType.BOOLEAN_TYPE; + break; + case BINARY: + case VARBINARY: + seaTunnelDataType = PrimitiveByteArrayType.INSTANCE; + break; + case DECIMAL: + org.apache.paimon.types.DecimalType paimonDecimalType = + (org.apache.paimon.types.DecimalType) dataType; + seaTunnelDataType = + new DecimalType( + paimonDecimalType.getPrecision(), paimonDecimalType.getScale()); + break; + case TINYINT: + case SMALLINT: + seaTunnelDataType = BasicType.SHORT_TYPE; + break; + case INTEGER: + seaTunnelDataType = BasicType.INT_TYPE; + break; + case BIGINT: + seaTunnelDataType = BasicType.LONG_TYPE; + break; + case FLOAT: + seaTunnelDataType = BasicType.FLOAT_TYPE; + break; + case DOUBLE: + seaTunnelDataType = BasicType.DOUBLE_TYPE; + break; + case DATE: + seaTunnelDataType = LocalTimeType.LOCAL_DATE_TYPE; + break; + case TIME_WITHOUT_TIME_ZONE: + seaTunnelDataType = LocalTimeType.LOCAL_TIME_TYPE; + break; + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + seaTunnelDataType = LocalTimeType.LOCAL_DATE_TIME_TYPE; + break; + case ARRAY: + org.apache.paimon.types.ArrayType paimonArrayType = + (org.apache.paimon.types.ArrayType) dataType; + DataType paimonArrayTypeElementType = paimonArrayType.getElementType(); + switch (paimonArrayTypeElementType.getTypeRoot()) { + case TINYINT: + case SMALLINT: + seaTunnelDataType = ArrayType.SHORT_ARRAY_TYPE; + break; + case INTEGER: + seaTunnelDataType = ArrayType.INT_ARRAY_TYPE; + break; + case BIGINT: + seaTunnelDataType = ArrayType.LONG_ARRAY_TYPE; + break; + case FLOAT: + seaTunnelDataType = ArrayType.FLOAT_ARRAY_TYPE; + break; + case DOUBLE: + seaTunnelDataType = ArrayType.DOUBLE_ARRAY_TYPE; + break; + case CHAR: + case VARCHAR: + seaTunnelDataType = ArrayType.STRING_ARRAY_TYPE; + break; + case BOOLEAN: + seaTunnelDataType = ArrayType.BOOLEAN_ARRAY_TYPE; + break; + case VARBINARY: + case BINARY: + seaTunnelDataType = ArrayType.BYTE_ARRAY_TYPE; + break; + default: + throw CommonError.unsupportedDataType( + identifier(), dataType.asSQLString(), "seatunnel"); + } + break; + case MAP: + MapType paimonMapType = (MapType) dataType; + SeaTunnelDataType keySeaTunnelDataType = + convert(paimonMapType.getKeyType()).getDataType(); + SeaTunnelDataType valueSeaTunnelDataType = + convert(paimonMapType.getValueType()).getDataType(); + seaTunnelDataType = + new org.apache.seatunnel.api.table.type.MapType( + keySeaTunnelDataType, valueSeaTunnelDataType); + break; + case ROW: + RowType paimonRowType = (RowType) dataType; + List paimonRowTypeFields = paimonRowType.getFields(); + String[] fieldNames = new String[paimonRowTypeFields.size()]; + SeaTunnelDataType[] fieldTypes = + new SeaTunnelDataType[paimonRowTypeFields.size()]; + for (int i = 0; i < paimonRowTypeFields.size(); i++) { + DataField dataField = paimonRowTypeFields.get(i); + fieldNames[i] = dataField.name(); + fieldTypes[i] = convert(dataField.type()).getDataType(); + } + seaTunnelDataType = new SeaTunnelRowType(fieldNames, fieldTypes); + break; + default: + throw CommonError.unsupportedDataType( + identifier(), dataType.asSQLString(), "seatunnel"); + } + return PhysicalColumn.builder().dataType(seaTunnelDataType).build(); } @Override @@ -202,7 +317,7 @@ public DataType reconvert(Column column) { case NULL: return DataTypes.VARBINARY(0); default: - throw CommonError.convertToConnectorTypeError( + throw CommonError.unsupportedDataType( identifier(), column.getDataType().getSqlType().name(), column.getName()); } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java new file mode 100644 index 00000000000..b479ebf14b0 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/handler/PaimonSaveModeHandler.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.handler; + +import org.apache.seatunnel.api.sink.DataSaveMode; +import org.apache.seatunnel.api.sink.DefaultSaveModeHandler; +import org.apache.seatunnel.api.sink.SchemaSaveMode; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog; +import org.apache.seatunnel.connectors.seatunnel.paimon.sink.SupportLoadTable; + +import org.apache.paimon.table.Table; + +public class PaimonSaveModeHandler extends DefaultSaveModeHandler { + + private SupportLoadTable supportLoadTable; + private Catalog catalog; + private CatalogTable catalogTable; + + public PaimonSaveModeHandler( + SupportLoadTable supportLoadTable, + SchemaSaveMode schemaSaveMode, + DataSaveMode dataSaveMode, + Catalog catalog, + CatalogTable catalogTable, + String customSql) { + super(schemaSaveMode, dataSaveMode, catalog, catalogTable, customSql); + this.supportLoadTable = supportLoadTable; + this.catalog = catalog; + this.catalogTable = catalogTable; + } + + @Override + public void handleSchemaSaveMode() { + super.handleSchemaSaveMode(); + TablePath tablePath = catalogTable.getTablePath(); + Table paimonTable = ((PaimonCatalog) catalog).getPaimonTable(tablePath); + // load paimon table and set it into paimon sink + this.supportLoadTable.setLoadTable(paimonTable); + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java index a808f800023..dde86a0213d 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSink.java @@ -17,53 +17,48 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.sink; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.serialization.DefaultSerializer; import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.sink.SaveModeHandler; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportSaveMode; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig; import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; +import org.apache.seatunnel.connectors.seatunnel.paimon.handler.PaimonSaveModeHandler; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggregatedCommitter; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo; import org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState; -import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.CatalogFactory; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.options.Options; -import org.apache.paimon.schema.Schema; + import org.apache.paimon.table.Table; import com.google.auto.service.AutoService; import java.io.IOException; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; -import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.DATABASE; -import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.HDFS_SITE_PATH; -import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.TABLE; -import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.WAREHOUSE; +import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory; @AutoService(SeaTunnelSink.class) public class PaimonSink implements SeaTunnelSink< - SeaTunnelRow, PaimonSinkState, PaimonCommitInfo, PaimonAggregatedCommitInfo> { + SeaTunnelRow, + PaimonSinkState, + PaimonCommitInfo, + PaimonAggregatedCommitInfo>, + SupportSaveMode, + SupportLoadTable
{ private static final long serialVersionUID = 1L; @@ -75,37 +70,18 @@ public class PaimonSink private JobContext jobContext; - public PaimonSink(Config pluginConfig, CatalogTable catalogTable) { - // initialize paimon table - final String warehouse = pluginConfig.getString(WAREHOUSE.key()); - final String database = pluginConfig.getString(DATABASE.key()); - final String table = pluginConfig.getString(TABLE.key()); - final Map optionsMap = new HashMap<>(); - optionsMap.put(WAREHOUSE.key(), warehouse); - final Options options = Options.fromMap(optionsMap); - final Configuration hadoopConf = new Configuration(); - if (pluginConfig.hasPath(HDFS_SITE_PATH.key())) { - hadoopConf.addResource(new Path(pluginConfig.getString(HDFS_SITE_PATH.key()))); - } - final CatalogContext catalogContext = CatalogContext.create(options, hadoopConf); - try (Catalog catalog = CatalogFactory.createCatalog(catalogContext)) { - Identifier identifier = Identifier.create(database, table); - // Auto create if not exists the database and table for paimon - catalog.createDatabase(database, true); - TableSchema tableSchema = catalogTable.getTableSchema(); - this.seaTunnelRowType = tableSchema.toPhysicalRowDataType(); - Schema paimonTableSchema = SchemaUtil.toPaimonSchema(tableSchema); - catalog.createTable(identifier, paimonTableSchema, true); - this.table = catalog.getTable(identifier); - // todo if source is cdc,need to check primary key of tableSchema - } catch (Exception e) { - String errorMsg = - String.format( - "Failed to create table [%s] from database [%s] on warehouse [%s]", - database, table, warehouse); - throw new PaimonConnectorException( - PaimonConnectorErrorCode.GET_TABLE_FAILED, errorMsg, e); - } + private ReadonlyConfig readonlyConfig; + + private PaimonSinkConfig paimonSinkConfig; + + private CatalogTable catalogTable; + + public PaimonSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { + this.readonlyConfig = readonlyConfig; + this.paimonSinkConfig = new PaimonSinkConfig(readonlyConfig); + this.catalogTable = catalogTable; + TableSchema tableSchema = catalogTable.getTableSchema(); + this.seaTunnelRowType = tableSchema.toPhysicalRowDataType(); } @Override @@ -145,4 +121,38 @@ public Optional> getCommitInfoSerializer() { public void setJobContext(JobContext jobContext) { this.jobContext = jobContext; } + + @Override + public Optional getSaveModeHandler() { + org.apache.seatunnel.api.table.factory.CatalogFactory catalogFactory = + discoverFactory( + Thread.currentThread().getContextClassLoader(), + org.apache.seatunnel.api.table.factory.CatalogFactory.class, + "Paimon"); + if (catalogFactory == null) { + throw new PaimonConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format( + "PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), + PluginType.SINK, + "Cannot find paimon catalog factory")); + } + org.apache.seatunnel.api.table.catalog.Catalog catalog = + catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), readonlyConfig); + catalog.open(); + return Optional.of( + new PaimonSaveModeHandler( + this, + paimonSinkConfig.getSchemaSaveMode(), + paimonSinkConfig.getDataSaveMode(), + catalog, + catalogTable, + null)); + } + + @Override + public void setLoadTable(Table table) { + this.table = table; + } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java index c4612e1387d..cb8075235e8 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java @@ -61,7 +61,7 @@ public OptionRule optionRule() { public TableSink createSink(TableSinkFactoryContext context) { Config pluginConfig = context.getOptions().toConfig(); CatalogTable catalogTable = renameCatalogTable(pluginConfig, context.getCatalogTable()); - return () -> new PaimonSink(pluginConfig, catalogTable); + return () -> new PaimonSink(context.getOptions(), catalogTable); } private CatalogTable renameCatalogTable(Config pluginConfig, CatalogTable catalogTable) { diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java new file mode 100644 index 00000000000..734762e23ca --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/SupportLoadTable.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.sink; + +public interface SupportLoadTable { + void setLoadTable(T table); +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java index aaa0f7407f9..c03a77149c9 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/SchemaUtil.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.PrimaryKey; import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.connectors.seatunnel.paimon.data.PaimonTypeMapper; import org.apache.paimon.schema.Schema; @@ -46,4 +47,8 @@ public static Schema toPaimonSchema(TableSchema tableSchema) { } return paiSchemaBuilder.build(); } + + public static SeaTunnelDataType toSeaTunnelType(DataType dataType) { + return PaimonTypeMapper.INSTANCE.convert(dataType).getDataType(); + } }