From 59730d0a6ea09fbfe6da2c6db4b816cf8c455d22 Mon Sep 17 00:00:00 2001 From: XenosK Date: Wed, 13 Nov 2024 10:47:15 +0800 Subject: [PATCH] [Feature][Connector-V2] update unique column function --- .../base/dialect/JdbcDataSourceDialect.java | 21 ------------------- .../AbstractJdbcSourceChunkSplitter.java | 10 ++++----- 2 files changed, 5 insertions(+), 26 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java index 923a8c0b76e..05e9a89c04c 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java @@ -26,7 +26,6 @@ import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import io.debezium.jdbc.JdbcConnection; @@ -120,26 +119,6 @@ default List getUniqueKeys(JdbcConnection jdbcConnection, TableId .collect(Collectors.toList()); } - default boolean isUniqueKey(JdbcConnection jdbcConnection, TableId tableId, String columnName) - throws SQLException { - boolean isUnique = false; - if (StringUtils.isNotEmpty(columnName)) { - DatabaseMetaData metaData = jdbcConnection.connection().getMetaData(); - ResultSet resultSet = - metaData.getIndexInfo( - tableId.catalog(), tableId.schema(), tableId.table(), false, false); - - while (resultSet.next()) { - if (columnName.equalsIgnoreCase(resultSet.getString("COLUMN_NAME")) - && !resultSet.getBoolean("NON_UNIQUE")) { - isUnique = true; - break; - } - } - } - return isUnique; - } - default List getConstraintKeys(JdbcConnection jdbcConnection, TableId tableId) throws SQLException { DatabaseMetaData metaData = jdbcConnection.connection().getMetaData(); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java index b16ecf1d3f5..32b78df6b65 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java @@ -391,27 +391,27 @@ protected Column getSplitColumn( try { splitColumnsConfig = sourceConfig.getSplitColumn(); } catch (Exception e) { - log.error("Config snapshot.split.column get exception in {}:{}", tableId, e); + log.error("Config snapshotSplitColumn get exception in {}:{}", tableId, e); } String tableSc = splitColumnsConfig.getOrDefault(tableId.catalog() + "." + tableId.table(), null); if (StringUtils.isNotEmpty(tableSc)) { - boolean isUniqueKey = dialect.isUniqueKey(jdbc, tableId, tableSc); + boolean isUniqueKey = dialect.getUniqueKeys(jdbc, tableId).contains(tableSc); if (isUniqueKey) { Column column = table.columnWithName(tableSc); if (isEvenlySplitColumn(column)) { return column; } else { log.warn( - "Config snapshot.split.column type in {} is not TINYINT、SMALLINT、INT、BIGINT、DECIMAL、STRING", + "Config snapshotSplitColumn type in {} is not TINYINT、SMALLINT、INT、BIGINT、DECIMAL、STRING", tableId); } } else { - log.warn("Config snapshot.split.column not unique key for table {}", tableId); + log.warn("Config snapshotSplitColumn not unique key for table {}", tableId); } } else { - log.info("Config snapshot.split.column not exists for table {}", tableId); + log.info("Config snapshotSplitColumn not exists for table {}", tableId); } Optional primaryKey = dialect.getPrimaryKey(jdbc, tableId);