Skip to content

Commit

Permalink
[Feature][Connector-V2] update unique column function
Browse files Browse the repository at this point in the history
  • Loading branch information
XenosK committed Nov 13, 2024
1 parent 307a617 commit 59730d0
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

import io.debezium.jdbc.JdbcConnection;
Expand Down Expand Up @@ -120,26 +119,6 @@ default List<ConstraintKey> getUniqueKeys(JdbcConnection jdbcConnection, TableId
.collect(Collectors.toList());
}

default boolean isUniqueKey(JdbcConnection jdbcConnection, TableId tableId, String columnName)
throws SQLException {
boolean isUnique = false;
if (StringUtils.isNotEmpty(columnName)) {
DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();
ResultSet resultSet =
metaData.getIndexInfo(
tableId.catalog(), tableId.schema(), tableId.table(), false, false);

while (resultSet.next()) {
if (columnName.equalsIgnoreCase(resultSet.getString("COLUMN_NAME"))
&& !resultSet.getBoolean("NON_UNIQUE")) {
isUnique = true;
break;
}
}
}
return isUnique;
}

default List<ConstraintKey> getConstraintKeys(JdbcConnection jdbcConnection, TableId tableId)
throws SQLException {
DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,27 +391,27 @@ protected Column getSplitColumn(
try {
splitColumnsConfig = sourceConfig.getSplitColumn();
} catch (Exception e) {
log.error("Config snapshot.split.column get exception in {}:{}", tableId, e);
log.error("Config snapshotSplitColumn get exception in {}:{}", tableId, e);
}
String tableSc =
splitColumnsConfig.getOrDefault(tableId.catalog() + "." + tableId.table(), null);

if (StringUtils.isNotEmpty(tableSc)) {
boolean isUniqueKey = dialect.isUniqueKey(jdbc, tableId, tableSc);
boolean isUniqueKey = dialect.getUniqueKeys(jdbc, tableId).contains(tableSc);
if (isUniqueKey) {
Column column = table.columnWithName(tableSc);
if (isEvenlySplitColumn(column)) {
return column;
} else {
log.warn(
"Config snapshot.split.column type in {} is not TINYINT、SMALLINT、INT、BIGINT、DECIMAL、STRING",
"Config snapshotSplitColumn type in {} is not TINYINT、SMALLINT、INT、BIGINT、DECIMAL、STRING",
tableId);
}
} else {
log.warn("Config snapshot.split.column not unique key for table {}", tableId);
log.warn("Config snapshotSplitColumn not unique key for table {}", tableId);
}
} else {
log.info("Config snapshot.split.column not exists for table {}", tableId);
log.info("Config snapshotSplitColumn not exists for table {}", tableId);
}

Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
Expand Down

0 comments on commit 59730d0

Please sign in to comment.