Skip to content

Commit

Permalink
[Feature][Paimon] Support the streaming mode for sink #6342
Browse files Browse the repository at this point in the history
[Feature][Paimon] Support cdc write of paimon sink #6427
[Feature][Paimon] Support auto create db and table for paimon sink
  • Loading branch information
dailai committed Mar 6, 2024
1 parent 7b48a16 commit 23dbea9
Show file tree
Hide file tree
Showing 15 changed files with 886 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.paimon.data;

import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;

import java.util.concurrent.atomic.AtomicInteger;

public class PaimonTypeMapper {
private static final AtomicInteger fieldId = new AtomicInteger(-1);

public static DataType toPaimonType(SeaTunnelDataType dataType) {
switch (dataType.getSqlType()) {
case BOOLEAN:
return DataTypes.BOOLEAN();
case BYTES:
return DataTypes.BYTES();
case SMALLINT:
return DataTypes.SMALLINT();
case TINYINT:
return DataTypes.TINYINT();
case INT:
return DataTypes.INT();
case BIGINT:
return DataTypes.BIGINT();
case FLOAT:
return DataTypes.FLOAT();
case DOUBLE:
return DataTypes.DOUBLE();
case DECIMAL:
DecimalType decimalType = (DecimalType) dataType;
return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale());
case ARRAY:
ArrayType arrayType = (ArrayType) dataType;
// converter elementType
DataType elementType = toPaimonType(arrayType.getElementType());
return DataTypes.ARRAY(elementType);
case MAP:
org.apache.seatunnel.api.table.type.MapType mapType =
(org.apache.seatunnel.api.table.type.MapType) dataType;
DataType keyType = toPaimonType(mapType.getKeyType());
DataType valueType = toPaimonType(mapType.getValueType());
return DataTypes.MAP(keyType, valueType);
case ROW:
SeaTunnelRowType seaTunnelRowType = (SeaTunnelRowType) dataType;
DataField[] dataFields = new DataField[seaTunnelRowType.getTotalFields()];
for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) {
String field = seaTunnelRowType.getFieldName(i);
SeaTunnelDataType fieldType = seaTunnelRowType.getFieldType(i);
int id = fieldId.incrementAndGet();
dataFields[i] = new DataField(id, field, toPaimonType(fieldType));
}
return DataTypes.ROW(dataFields);
case DATE:
return DataTypes.DATE();
case TIME:
return DataTypes.TIME();
case TIMESTAMP:
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
case STRING:
default:
return DataTypes.STRING();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
Expand All @@ -38,6 +39,7 @@
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand All @@ -46,6 +48,7 @@
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.Table;

import com.google.auto.service.AutoService;
Expand All @@ -72,18 +75,11 @@ public class PaimonSink

private SeaTunnelRowType seaTunnelRowType;

private Config pluginConfig;

private Table table;

@Override
public String getPluginName() {
return PLUGIN_NAME;
}
private JobContext jobContext;

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
this.pluginConfig = pluginConfig;
public PaimonSink(Config pluginConfig, CatalogTable catalogTable) {
CheckResult result =
CheckConfigUtil.checkAllExists(
pluginConfig, WAREHOUSE.key(), DATABASE.key(), TABLE.key());
Expand All @@ -108,43 +104,45 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
final CatalogContext catalogContext = CatalogContext.create(options, hadoopConf);
try (Catalog catalog = CatalogFactory.createCatalog(catalogContext)) {
Identifier identifier = Identifier.create(database, table);
// Auto create if not exists the database and table for paimon
catalog.createDatabase(database, true);
TableSchema tableSchema = catalogTable.getTableSchema();
this.seaTunnelRowType = tableSchema.toPhysicalRowDataType();
Schema paimonTableSchema = SchemaUtil.toPaimonSchema(tableSchema);
catalog.createTable(identifier, paimonTableSchema, true);
this.table = catalog.getTable(identifier);
// todo if source is cdc,need to check primary key of tableSchema
} catch (Exception e) {
String errorMsg =
String.format(
"Failed to get table [%s] from database [%s] on warehouse [%s]",
"Failed to create table [%s] from database [%s] on warehouse [%s]",
database, table, warehouse);
throw new PaimonConnectorException(
PaimonConnectorErrorCode.GET_TABLE_FAILED, errorMsg, e);
}
}

@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
return this.seaTunnelRowType;
public String getPluginName() {
return PLUGIN_NAME;
}

@Override
public SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState> createWriter(
SinkWriter.Context context) throws IOException {
return new PaimonSinkWriter(context, table, seaTunnelRowType);
return new PaimonSinkWriter(context, table, seaTunnelRowType, jobContext);
}

@Override
public Optional<SinkAggregatedCommitter<PaimonCommitInfo, PaimonAggregatedCommitInfo>>
createAggregatedCommitter() throws IOException {
return Optional.of(new PaimonAggregatedCommitter(table));
return Optional.of(new PaimonAggregatedCommitter(table, jobContext));
}

@Override
public SinkWriter<SeaTunnelRow, PaimonCommitInfo, PaimonSinkState> restoreWriter(
SinkWriter.Context context, List<PaimonSinkState> states) throws IOException {
return new PaimonSinkWriter(context, table, seaTunnelRowType, states);
return new PaimonSinkWriter(context, table, seaTunnelRowType, states, jobContext);
}

@Override
Expand All @@ -156,4 +154,9 @@ public Optional<Serializer<PaimonAggregatedCommitInfo>> getAggregatedCommitInfoS
public Optional<Serializer<PaimonCommitInfo>> getCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
}

@Override
public void setJobContext(JobContext jobContext) {
this.jobContext = jobContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package org.apache.seatunnel.connectors.seatunnel.paimon.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;

import com.google.auto.service.AutoService;
Expand All @@ -41,4 +46,11 @@ public OptionRule optionRule() {
.optional(PaimonConfig.HDFS_SITE_PATH)
.build();
}

@Override
public TableSink createSink(TableSinkFactoryContext context) {
Config pluginConfig = context.getOptions().toConfig();
CatalogTable catalogTable = context.getCatalogTable();
return () -> new PaimonSink(pluginConfig, catalogTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,27 @@

package org.apache.seatunnel.connectors.seatunnel.paimon.sink;

import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.commit.PaimonCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.paimon.sink.state.PaimonSinkState;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.JobContextUtil;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.TableCommit;
import org.apache.paimon.table.sink.TableWrite;
import org.apache.paimon.table.sink.WriteBuilder;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -50,9 +56,9 @@ public class PaimonSinkWriter

private String commitUser = UUID.randomUUID().toString();

private final BatchWriteBuilder tableWriteBuilder;
private final WriteBuilder tableWriteBuilder;

private final BatchTableWrite tableWrite;
private final TableWrite tableWrite;

private long checkpointId = 0;

Expand All @@ -64,37 +70,58 @@ public class PaimonSinkWriter

private final SinkWriter.Context context;

public PaimonSinkWriter(Context context, Table table, SeaTunnelRowType seaTunnelRowType) {
private final JobContext jobContext;

public PaimonSinkWriter(
Context context,
Table table,
SeaTunnelRowType seaTunnelRowType,
JobContext jobContext) {
this.table = table;
this.tableWriteBuilder = this.table.newBatchWriteBuilder().withOverwrite();
this.tableWriteBuilder =
JobContextUtil.isBatchJob(jobContext)
? this.table.newBatchWriteBuilder().withOverwrite()
: this.table.newStreamWriteBuilder();
this.tableWrite = tableWriteBuilder.newWrite();
this.seaTunnelRowType = seaTunnelRowType;
this.context = context;
this.jobContext = jobContext;
}

public PaimonSinkWriter(
Context context,
Table table,
SeaTunnelRowType seaTunnelRowType,
List<PaimonSinkState> states) {
List<PaimonSinkState> states,
JobContext jobContext) {
this.table = table;
this.tableWriteBuilder = this.table.newBatchWriteBuilder().withOverwrite();
this.tableWriteBuilder =
JobContextUtil.isBatchJob(jobContext)
? this.table.newBatchWriteBuilder().withOverwrite()
: this.table.newStreamWriteBuilder();
this.tableWrite = tableWriteBuilder.newWrite();
this.seaTunnelRowType = seaTunnelRowType;
this.context = context;
this.jobContext = jobContext;
if (Objects.isNull(states) || states.isEmpty()) {
return;
}
this.commitUser = states.get(0).getCommitUser();
this.checkpointId = states.get(0).getCheckpointId();
try (BatchTableCommit tableCommit = tableWriteBuilder.newCommit()) {
try (TableCommit tableCommit = tableWriteBuilder.newCommit()) {
List<CommitMessage> commitables =
states.stream()
.map(PaimonSinkState::getCommittables)
.flatMap(List::stream)
.collect(Collectors.toList());
log.info("Trying to recommit states {}", commitables);
tableCommit.commit(commitables);
if (JobContextUtil.isBatchJob(jobContext)) {
log.debug("Trying to recommit states batch mode");
((BatchTableCommit) tableCommit).commit(commitables);
} else {
log.debug("Trying to recommit states streaming mode");
((StreamTableCommit) tableCommit).commit(Objects.hash(commitables), commitables);
}
} catch (Exception e) {
throw new PaimonConnectorException(
PaimonConnectorErrorCode.TABLE_WRITE_COMMIT_FAILED, e);
Expand All @@ -117,7 +144,13 @@ public void write(SeaTunnelRow element) throws IOException {
@Override
public Optional<PaimonCommitInfo> prepareCommit() throws IOException {
try {
List<CommitMessage> fileCommittables = tableWrite.prepareCommit();
List<CommitMessage> fileCommittables;
if (JobContextUtil.isBatchJob(jobContext)) {
fileCommittables = ((BatchTableWrite) tableWrite).prepareCommit();
} else {
fileCommittables =
((StreamTableWrite) tableWrite).prepareCommit(false, committables.size());
}
committables.addAll(fileCommittables);
return Optional.of(new PaimonCommitInfo(fileCommittables));
} catch (Exception e) {
Expand Down
Loading

0 comments on commit 23dbea9

Please sign in to comment.