Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
gnehil committed Nov 15, 2023
1 parent 8d00893 commit fe56ae7
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DataSaveMode;

import lombok.Getter;
import lombok.Setter;
Expand All @@ -44,7 +43,6 @@
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES;
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD;
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT;
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SAVE_MODE;
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SAVE_MODE_CREATE_TEMPLATE;
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_BUFFER_COUNT;
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.SINK_BUFFER_SIZE;
Expand Down Expand Up @@ -93,7 +91,6 @@ public class DorisConfig implements Serializable {
private Properties streamLoadProps;

// create table option
private DataSaveMode saveMode;
private String createTableTemplate;

public static DorisConfig of(Config pluginConfig) {
Expand Down Expand Up @@ -134,7 +131,6 @@ public static DorisConfig of(ReadonlyConfig config) {
dorisConfig.setEnableDelete(config.get(SINK_ENABLE_DELETE));

// create table option
dorisConfig.setSaveMode(config.get(SAVE_MODE));
dorisConfig.setCreateTableTemplate(config.get(SAVE_MODE_CREATE_TEMPLATE));

return dorisConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,8 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.SingleChoiceOption;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;

import java.util.Collections;
import java.util.Map;

public interface DorisOptions {
Expand Down Expand Up @@ -213,16 +209,6 @@ public interface DorisOptions {
.withDescription("");

// create table

SingleChoiceOption<DataSaveMode> SAVE_MODE =
Options.key(SupportDataSaveMode.SAVE_MODE_KEY)
.singleChoice(
DataSaveMode.class,
Collections.singletonList(DataSaveMode.KEEP_SCHEMA_AND_DATA))
.noDefaultValue()
.withDescription(
"Table structure and data processing methods that already exist on the target end");

Option<String> SAVE_MODE_CREATE_TEMPLATE =
Options.key("save_mode_create_template")
.stringType()
Expand All @@ -241,8 +227,5 @@ public interface DorisOptions {
OptionRule.builder().required(FENODES, USERNAME, PASSWORD, TABLE_IDENTIFIER);

OptionRule.Builder CATALOG_RULE =
OptionRule.builder()
.required(FENODES, QUERY_PORT, USERNAME, PASSWORD)
.optional(SAVE_MODE)
.conditional(SAVE_MODE, DataSaveMode.KEEP_SCHEMA_AND_DATA);
OptionRule.builder().required(FENODES, QUERY_PORT, USERNAME, PASSWORD);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,19 @@
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SaveModeHandler;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.api.sink.SupportSaveMode;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.doris.catalog.DorisCatalog;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
import org.apache.seatunnel.connectors.doris.config.DorisOptions;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
Expand All @@ -59,23 +57,20 @@
@AutoService(SeaTunnelSink.class)
public class DorisSink
implements SeaTunnelSink<SeaTunnelRow, DorisSinkState, DorisCommitInfo, DorisCommitInfo>,
SupportDataSaveMode {
SupportSaveMode {

private DorisConfig dorisConfig;
private SeaTunnelRowType seaTunnelRowType;
private String jobId;

private CatalogTable catalogTable;

private DataSaveMode saveMode;

public DorisSink() {}

public DorisSink(ReadonlyConfig config, CatalogTable catalogTable) {
this.dorisConfig = DorisConfig.of(config);
this.catalogTable = catalogTable;
this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
this.saveMode = dorisConfig.getSaveMode();
}

@Override
Expand Down Expand Up @@ -106,7 +101,6 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
+ catalogTable.getTableId().getTableName();
dorisConfig.setTableIdentifier(tableIdentifier);
}
this.saveMode = dorisConfig.getSaveMode();
}

@Override
Expand Down Expand Up @@ -170,28 +164,7 @@ public Optional<Serializer<DorisCommitInfo>> getAggregatedCommitInfoSerializer()
}

@Override
public DataSaveMode getUserConfigSaveMode() {
return saveMode;
}

@Override
public void handleSaveMode(DataSaveMode userConfigSaveMode) {
if (catalogTable != null && DataSaveMode.KEEP_SCHEMA_AND_DATA.equals(userConfigSaveMode)) {
try (DorisCatalog dorisCatalog =
new DorisCatalog(
"Doris",
dorisConfig.getFrontends(),
dorisConfig.getQueryPort(),
dorisConfig.getUsername(),
dorisConfig.getPassword(),
dorisConfig)) {
dorisCatalog.open();
TablePath tablePath = TablePath.of(dorisConfig.getTableIdentifier());
if (!dorisCatalog.databaseExists(tablePath.getDatabaseName())) {
dorisCatalog.createDatabase(tablePath, true);
}
dorisCatalog.createTable(tablePath, catalogTable, true);
}
}
public Optional<SaveModeHandler> getSaveModeHandler() {
return Optional.empty();
}
}

0 comments on commit fe56ae7

Please sign in to comment.