Skip to content

Commit

Permalink
[Feature][CDC] Support tables without primary keys (with unique keys) (
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 authored Jul 26, 2023
1 parent 53a1f0c commit 32b7f2b
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.connectors.cdc.base.dialect;

import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionFactory;
Expand All @@ -25,11 +27,23 @@
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.JdbcSourceFetchTaskContext;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;

import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public interface JdbcDataSourceDialect extends DataSourceDialect<JdbcSourceConfig> {

Expand Down Expand Up @@ -68,4 +82,90 @@ default JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
@Override
JdbcSourceFetchTaskContext createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig);

default Optional<PrimaryKey> getPrimaryKey(JdbcConnection jdbcConnection, TableId tableId)
throws SQLException {

DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();

// According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
// the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
// We need to sort them based on the KEY_SEQ value.
ResultSet rs =
metaData.getPrimaryKeys(tableId.catalog(), tableId.schema(), tableId.table());

// seq -> column name
List<Pair<Integer, String>> primaryKeyColumns = new ArrayList<>();
String pkName = null;
while (rs.next()) {
// all the PK_NAME should be the same
pkName = rs.getString("PK_NAME");
String columnName = rs.getString("COLUMN_NAME");
int keySeq = rs.getInt("KEY_SEQ");
// KEY_SEQ is 1-based index
primaryKeyColumns.add(Pair.of(keySeq, columnName));
}
// initialize size
List<String> pkFields =
primaryKeyColumns.stream()
.sorted(Comparator.comparingInt(Pair::getKey))
.map(Pair::getValue)
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(pkFields)) {
return Optional.empty();
}
return Optional.of(PrimaryKey.of(pkName, pkFields));
}

default List<ConstraintKey> getUniqueKeys(JdbcConnection jdbcConnection, TableId tableId)
throws SQLException {
return getConstraintKeys(jdbcConnection, tableId).stream()
.filter(
constraintKey ->
constraintKey.getConstraintType()
== ConstraintKey.ConstraintType.UNIQUE_KEY)
.collect(Collectors.toList());
}

default List<ConstraintKey> getConstraintKeys(JdbcConnection jdbcConnection, TableId tableId)
throws SQLException {
DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();

ResultSet resultSet =
metaData.getIndexInfo(
tableId.catalog(), tableId.schema(), tableId.table(), false, false);
// index name -> index
Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
while (resultSet.next()) {
String columnName = resultSet.getString("COLUMN_NAME");
if (columnName == null) {
continue;
}

String indexName = resultSet.getString("INDEX_NAME");
boolean noUnique = resultSet.getBoolean("NON_UNIQUE");

ConstraintKey constraintKey =
constraintKeyMap.computeIfAbsent(
indexName,
s -> {
ConstraintKey.ConstraintType constraintType =
ConstraintKey.ConstraintType.KEY;
if (!noUnique) {
constraintType = ConstraintKey.ConstraintType.UNIQUE_KEY;
}
return ConstraintKey.of(
constraintType, indexName, new ArrayList<>());
});

ConstraintKey.ColumnSortType sortType =
"A".equals(resultSet.getString("ASC_OR_DESC"))
? ConstraintKey.ColumnSortType.ASC
: ConstraintKey.ColumnSortType.DESC;
ConstraintKey.ConstraintKeyColumn constraintKeyColumn =
new ConstraintKey.ConstraintKeyColumn(columnName, sortType);
constraintKey.getColumnNames().add(constraintKeyColumn);
}
return new ArrayList<>(constraintKeyMap.values());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,22 @@

package org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter;

import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;

import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;

/** The {@code ChunkSplitter} used to split table into a set of chunks for JDBC data source. */
public interface JdbcSourceChunkSplitter extends ChunkSplitter {
Expand Down Expand Up @@ -161,4 +167,42 @@ default SeaTunnelRowType getSplitType(Column splitColumn) {
new String[] {splitColumn.name()},
new SeaTunnelDataType[] {fromDbzColumn(splitColumn)});
}

default Column getSplitColumn(
JdbcConnection jdbc, JdbcDataSourceDialect dialect, TableId tableId)
throws SQLException {
Optional<PrimaryKey> primaryKey = dialect.getPrimaryKey(jdbc, tableId);
if (primaryKey.isPresent()) {
List<String> pkColumns = primaryKey.get().getColumnNames();

Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
for (String pkColumn : pkColumns) {
Column column = table.columnWithName(pkColumn);
if (isEvenlySplitColumn(column)) {
return column;
}
}
}

List<ConstraintKey> uniqueKeys = dialect.getUniqueKeys(jdbc, tableId);
if (!uniqueKeys.isEmpty()) {
Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
for (ConstraintKey uniqueKey : uniqueKeys) {
List<ConstraintKey.ConstraintKeyColumn> uniqueKeyColumns =
uniqueKey.getColumnNames();
for (ConstraintKey.ConstraintKeyColumn uniqueKeyColumn : uniqueKeyColumns) {
Column column = table.columnWithName(uniqueKeyColumn.getColumnName());
if (isEvenlySplitColumn(column)) {
return column;
}
}
}
}

throw new UnsupportedOperationException(
String.format(
"Incremental snapshot for tables requires primary key/unique key,"
+ " but table %s doesn't have primary key.",
tableId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;

import java.math.BigDecimal;
Expand Down Expand Up @@ -67,8 +66,7 @@ public Collection<SnapshotSplit> generateSplits(TableId tableId) {
LOG.info("Start splitting table {} into chunks...", tableId);
long start = System.currentTimeMillis();

Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
Column splitColumn = getSplitColumn(table);
Column splitColumn = getSplitColumn(jdbc, dialect, tableId);
final List<ChunkRange> chunks;
try {
chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
Expand Down Expand Up @@ -393,18 +391,4 @@ private static void maySleep(int count, TableId tableId) {
LOG.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId);
}
}

public static Column getSplitColumn(Table table) {
List<Column> primaryKeys = table.primaryKeyColumns();
if (primaryKeys.isEmpty()) {
throw new UnsupportedOperationException(
String.format(
"Incremental snapshot for tables requires primary key,"
+ " but table %s doesn't have primary key.",
table.id()));
}

// use first field in primary key as the split key
return primaryKeys.get(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
Expand Down Expand Up @@ -47,7 +46,7 @@
public class SqlServerDialect implements JdbcDataSourceDialect {

private static final long serialVersionUID = 1L;
private final SourceConfig sourceConfig;
private final SqlServerSourceConfig sourceConfig;

private transient SqlServerSchema sqlServerSchema;

Expand Down Expand Up @@ -95,7 +94,7 @@ public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
@Override
public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
if (sqlServerSchema == null) {
sqlServerSchema = new SqlServerSchema();
sqlServerSchema = new SqlServerSchema(sourceConfig.getDbzConnectorConfig());
}
return sqlServerSchema.getTableSchema(jdbc, tableId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -64,8 +63,7 @@ public Collection<SnapshotSplit> generateSplits(TableId tableId) {
log.info("Start splitting table {} into chunks...", tableId);
long start = System.currentTimeMillis();

Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
Column splitColumn = getSplitColumn(table);
Column splitColumn = getSplitColumn(jdbc, dialect, tableId);
final List<ChunkRange> chunks;
try {
chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
Expand Down Expand Up @@ -390,18 +388,4 @@ private static void maySleep(int count, TableId tableId) {
log.info("JdbcSourceChunkSplitter has split {} chunks for table {}", count, tableId);
}
}

public static Column getSplitColumn(Table table) {
List<Column> primaryKeys = table.primaryKeyColumns();
if (primaryKeys.isEmpty()) {
throw new UnsupportedOperationException(
String.format(
"Incremental snapshot for tables requires primary key,"
+ " but table %s doesn't have primary key.",
table.id()));
}

// use first field in primary key as the split key
return primaryKeys.get(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.common.utils.SeaTunnelException;

import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
Expand All @@ -29,18 +30,18 @@

import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/** A component used to get schema by table path. */
public class SqlServerSchema {

private final SqlServerConnectorConfig connectorConfig;
private final Map<TableId, TableChange> schemasByTableId;

public SqlServerSchema() {
public SqlServerSchema(SqlServerConnectorConfig connectorConfig) {
this.schemasByTableId = new ConcurrentHashMap<>();
this.connectorConfig = connectorConfig;
}

public TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) {
Expand All @@ -55,16 +56,17 @@ public TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) {

private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
SqlServerConnection sqlServerConnection = (SqlServerConnection) jdbc;
Set<TableId> tableIdSet = new HashSet<>();
tableIdSet.add(tableId);

final Map<TableId, TableChange> tableChangeMap = new HashMap<>();
Tables tables = new Tables();
tables.overwriteTable(tables.editOrCreateTable(tableId).create());

try {
sqlServerConnection.readSchema(
tables, tableId.catalog(), tableId.schema(), null, null, false);
tables,
tableId.catalog(),
tableId.schema(),
connectorConfig.getTableFilters().dataCollectionFilter(),
null,
false);
Table table = tables.forTable(tableId);
TableChange tableChange = new TableChange(TableChanges.TableChangeType.CREATE, table);
tableChangeMap.put(tableId, tableChange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,20 @@ protected List<ConstraintKey> getConstraintKeys(
// index name -> index
Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
while (resultSet.next()) {
String indexName = resultSet.getString("INDEX_NAME");
String columnName = resultSet.getString("COLUMN_NAME");
String unique = resultSet.getString("NON_UNIQUE");
if (columnName == null) {
continue;
}
String indexName = resultSet.getString("INDEX_NAME");
boolean noUnique = resultSet.getBoolean("NON_UNIQUE");

ConstraintKey constraintKey =
constraintKeyMap.computeIfAbsent(
indexName,
s -> {
ConstraintKey.ConstraintType constraintType =
ConstraintKey.ConstraintType.KEY;
// 0 is unique.
if ("0".equals(unique)) {
if (!noUnique) {
constraintType = ConstraintKey.ConstraintType.UNIQUE_KEY;
}
return ConstraintKey.of(
Expand Down
Loading

0 comments on commit 32b7f2b

Please sign in to comment.