From 46e8d24b82e5f9593588b2f0165a743d3d9e6235 Mon Sep 17 00:00:00 2001 From: Xiaojian Sun Date: Sun, 21 May 2023 22:18:23 +0800 Subject: [PATCH 1/2] [hotfix] JDBC Source lost data,when the data of partition_column is null. --- .../converter/AbstractJdbcRowConverter.java | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java index 49ac946acc8..92246c4e85f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java @@ -51,46 +51,47 @@ public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws S int resultSetIndex = fieldIndex + 1; switch (seaTunnelDataType.getSqlType()) { case STRING: - fields[fieldIndex] = rs.getString(resultSetIndex); + fields[fieldIndex] = rs.getObject(resultSetIndex, String.class); break; case BOOLEAN: - fields[fieldIndex] = rs.getBoolean(resultSetIndex); + fields[fieldIndex] = rs.getObject(resultSetIndex, Boolean.class); break; case TINYINT: - fields[fieldIndex] = rs.getByte(resultSetIndex); + fields[fieldIndex] = rs.getObject(resultSetIndex, Byte.class); break; case SMALLINT: - fields[fieldIndex] = rs.getShort(resultSetIndex); + fields[fieldIndex] = rs.getObject(resultSetIndex, Short.class); break; case INT: - fields[fieldIndex] = rs.getInt(resultSetIndex); + fields[fieldIndex] = rs.getObject(resultSetIndex, Integer.class); break; case BIGINT: - fields[fieldIndex] = rs.getLong(resultSetIndex); + fields[fieldIndex] = rs.getObject(resultSetIndex, Long.class); break; case FLOAT: - fields[fieldIndex] = rs.getFloat(resultSetIndex); + fields[fieldIndex] = rs.getObject(resultSetIndex, Float.class); break; case DOUBLE: - fields[fieldIndex] = rs.getDouble(resultSetIndex); + fields[fieldIndex] = rs.getObject(resultSetIndex, Double.class); break; case DECIMAL: - fields[fieldIndex] = rs.getBigDecimal(resultSetIndex); + fields[fieldIndex] = rs.getObject(resultSetIndex, BigDecimal.class); break; case DATE: - Date sqlDate = rs.getDate(resultSetIndex); fields[fieldIndex] = - Optional.ofNullable(sqlDate).map(e -> e.toLocalDate()).orElse(null); + Optional.ofNullable(rs.getObject(resultSetIndex, Date.class)) + .map(e -> e.toLocalDate()) + .orElse(null); break; case TIME: - Time sqlTime = rs.getTime(resultSetIndex); fields[fieldIndex] = - Optional.ofNullable(sqlTime).map(e -> e.toLocalTime()).orElse(null); + Optional.ofNullable(rs.getObject(resultSetIndex, Time.class)) + .map(e -> e.toLocalTime()) + .orElse(null); break; case TIMESTAMP: - Timestamp sqlTimestamp = rs.getTimestamp(resultSetIndex); fields[fieldIndex] = - Optional.ofNullable(sqlTimestamp) + Optional.ofNullable(rs.getObject(resultSetIndex, Timestamp.class)) .map(e -> e.toLocalDateTime()) .orElse(null); break; From 4f6b7dcb42d32973469f8db3a70eec6ec7871bf1 Mon Sep 17 00:00:00 2001 From: choucmei Date: Tue, 10 Oct 2023 22:47:11 +0800 Subject: [PATCH 2/2] add test --- .../psql/PostgresJdbcRowConverter.java | 32 +++-- .../seatunnel/jdbc/AbstractJdbcIT.java | 5 +- .../seatunnel/jdbc/JdbcITErrorCode.java | 1 + .../seatunnel/jdbc/JdbcMysqlIT.java | 136 +++++++++++------- .../seatunnel/jdbc/JdbcOracleIT.java | 64 ++++++--- .../jdbc/JdbcPostgresIdentifierIT.java | 72 +++++++++- .../resources/jdbc_mysql_source_and_sink.conf | 13 +- .../jdbc_mysql_source_and_sink_parallel.conf | 14 +- ..._source_and_sink_parallel_upper_lower.conf | 17 ++- 9 files changed, 245 insertions(+), 109 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java index 13ace89a046..226e714aa5b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; +import java.math.BigDecimal; import java.sql.Date; import java.sql.ResultSet; import java.sql.SQLException; @@ -59,47 +60,48 @@ public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws S ? null : rs.getObject(resultSetIndex).toString(); } else { - fields[fieldIndex] = rs.getString(resultSetIndex); + fields[fieldIndex] = rs.getObject(resultSetIndex, String.class); } break; case BOOLEAN: - fields[fieldIndex] = rs.getBoolean(resultSetIndex); + fields[fieldIndex] = rs.getObject(resultSetIndex, Boolean.class); break; case TINYINT: - fields[fieldIndex] = rs.getByte(resultSetIndex); + fields[fieldIndex] = rs.getObject(resultSetIndex, Byte.class); break; case SMALLINT: - fields[fieldIndex] = rs.getShort(resultSetIndex); + fields[fieldIndex] = rs.getObject(resultSetIndex, Short.class); break; case INT: - fields[fieldIndex] = rs.getInt(resultSetIndex); + fields[fieldIndex] = rs.getObject(resultSetIndex, Integer.class); break; case BIGINT: - fields[fieldIndex] = rs.getLong(resultSetIndex); + fields[fieldIndex] = rs.getObject(resultSetIndex, Long.class); break; case FLOAT: - fields[fieldIndex] = rs.getFloat(resultSetIndex); + fields[fieldIndex] = rs.getObject(resultSetIndex, Float.class); break; case DOUBLE: - fields[fieldIndex] = rs.getDouble(resultSetIndex); + fields[fieldIndex] = rs.getObject(resultSetIndex, Double.class); break; case DECIMAL: - fields[fieldIndex] = rs.getBigDecimal(resultSetIndex); + fields[fieldIndex] = rs.getObject(resultSetIndex, BigDecimal.class); break; case DATE: - Date sqlDate = rs.getDate(resultSetIndex); fields[fieldIndex] = - Optional.ofNullable(sqlDate).map(e -> e.toLocalDate()).orElse(null); + Optional.ofNullable(rs.getObject(resultSetIndex, Date.class)) + .map(e -> e.toLocalDate()) + .orElse(null); break; case TIME: - Time sqlTime = rs.getTime(resultSetIndex); fields[fieldIndex] = - Optional.ofNullable(sqlTime).map(e -> e.toLocalTime()).orElse(null); + Optional.ofNullable(rs.getObject(resultSetIndex, Time.class)) + .map(e -> e.toLocalTime()) + .orElse(null); break; case TIMESTAMP: - Timestamp sqlTimestamp = rs.getTimestamp(resultSetIndex); fields[fieldIndex] = - Optional.ofNullable(sqlTimestamp) + Optional.ofNullable(rs.getObject(resultSetIndex, Timestamp.class)) .map(e -> e.toLocalDateTime()) .orElse(null); break; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java index a38fb2217f2..c85f0e55d89 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java @@ -257,10 +257,9 @@ public void testJdbcDb(TestContainer container) for (String configFile : configFiles) { Container.ExecResult execResult = container.executeJob(configFile); Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + compareResult(); + clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(), jdbcCase.getSinkTable()); } - - compareResult(); - clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(), jdbcCase.getSinkTable()); } protected void initCatalog() {} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcITErrorCode.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcITErrorCode.java index 965fb7ba85f..4ed8b076c31 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcITErrorCode.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcITErrorCode.java @@ -25,6 +25,7 @@ public enum JdbcITErrorCode implements SeaTunnelErrorCode { CREATE_TABLE_FAILED("JDBC-IT-02", "Fail to create table."), INSERT_DATA_FAILED("JDBC-IT-03", "Fail to inert data."), DRIVER_NOT_FOUND("JDBC-IT-04", "Can not get the driver."), + QUERY_TABLE_FAILED("JDBC-IT-05", "Fail to query table."), ; private final String code; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java index b10aa0c2225..e76614eb374 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMysqlIT.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.Assertions; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.MySQLContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; @@ -34,7 +35,9 @@ import com.google.common.collect.Lists; import java.math.BigDecimal; -import java.sql.Date; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; @@ -67,6 +70,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT { private static final String CREATE_SQL = "CREATE TABLE IF NOT EXISTS %s\n" + "(\n" + + " `id` bigint(18) NOT NULL AUTO_INCREMENT,\n" + " `c_bit_1` bit(1) DEFAULT NULL,\n" + " `c_bit_8` bit(8) DEFAULT NULL,\n" + " `c_bit_16` bit(16) DEFAULT NULL,\n" @@ -110,9 +114,58 @@ public class JdbcMysqlIT extends AbstractJdbcIT { + " `c_integer_unsigned` int(10) unsigned DEFAULT NULL,\n" + " `c_bigint_30` BIGINT(40) unsigned DEFAULT NULL,\n" + " `c_decimal_unsigned_30` DECIMAL(30) unsigned DEFAULT NULL,\n" - + " `c_decimal_30` DECIMAL(30) DEFAULT NULL\n" + + " `c_decimal_30` DECIMAL(30) DEFAULT NULL,\n" + + " PRIMARY KEY(`id`)" + ");"; + private static final String[] fieldNames = + new String[] { + "c_bit_1", + "c_bit_8", + "c_bit_16", + "c_bit_32", + "c_bit_64", + "c_boolean", + "c_tinyint", + "c_tinyint_unsigned", + "c_smallint", + "c_smallint_unsigned", + "c_mediumint", + "c_mediumint_unsigned", + "c_int", + "c_integer", + "c_year", + "c_int_unsigned", + "c_integer_unsigned", + "c_bigint", + "c_bigint_unsigned", + "c_decimal", + "c_decimal_unsigned", + "c_float", + "c_float_unsigned", + "c_double", + "c_double_unsigned", + "c_char", + "c_tinytext", + "c_mediumtext", + "c_text", + "c_varchar", + "c_json", + "c_longtext", + "c_date", + "c_datetime", + "c_timestamp", + "c_tinyblob", + "c_mediumblob", + "c_blob", + "c_longblob", + "c_varbinary", + "c_binary", + "c_bigint_30", + "c_decimal_unsigned_30", + "c_decimal_30", + }; + @Override JdbcCase getJdbcCase() { Map containerEnv = new HashMap<>(); @@ -147,7 +200,25 @@ JdbcCase getJdbcCase() { } @Override - void compareResult() {} + void compareResult() { + try (Statement statement = connection.createStatement()) { + ResultSet resultSet = + statement.executeQuery( + String.format( + "select count(1) from %s.%s where c_bit_1 is null and c_bit_8 is null and c_bit_16 is null and c_bit_32" + + " is null and c_bit_64 is null and c_boolean is null and c_tinyint is null and c_tinyint_unsigned is null and c_smallint is null and c_smallint_unsigned is null " + + "and c_mediumint is null and c_mediumint_unsigned is null and c_int is null and c_integer is null and c_bigint is null and c_bigint_unsigned is null and c_decimal is null " + + "and c_decimal_unsigned is null and c_float is null and c_float_unsigned is null and c_double is null and c_double_unsigned is null and c_char is null and c_tinytext is null " + + "and c_mediumtext is null and c_text is null and c_varchar is null and c_json is null and c_longtext is null and c_date is null and c_datetime is null and c_timestamp is null " + + "and c_tinyblob is null and c_mediumblob is null and c_blob is null and c_longblob is null and c_varbinary is null and c_binary is null and c_year is null " + + "and c_int_unsigned is null and c_integer_unsigned is null and c_bigint_30 is null and c_decimal_unsigned_30 is null and c_decimal_30 is null", + getJdbcCase().getDatabase(), getJdbcCase().getSinkTable())); + resultSet.next(); + Assertions.assertEquals(resultSet.getInt(1), 1); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } @Override String driverUrl() { @@ -156,54 +227,6 @@ String driverUrl() { @Override Pair> initTestData() { - String[] fieldNames = - new String[] { - "c_bit_1", - "c_bit_8", - "c_bit_16", - "c_bit_32", - "c_bit_64", - "c_boolean", - "c_tinyint", - "c_tinyint_unsigned", - "c_smallint", - "c_smallint_unsigned", - "c_mediumint", - "c_mediumint_unsigned", - "c_int", - "c_integer", - "c_year", - "c_int_unsigned", - "c_integer_unsigned", - "c_bigint", - "c_bigint_unsigned", - "c_decimal", - "c_decimal_unsigned", - "c_float", - "c_float_unsigned", - "c_double", - "c_double_unsigned", - "c_char", - "c_tinytext", - "c_mediumtext", - "c_text", - "c_varchar", - "c_json", - "c_longtext", - "c_date", - "c_datetime", - "c_timestamp", - "c_tinyblob", - "c_mediumblob", - "c_blob", - "c_longblob", - "c_varbinary", - "c_binary", - "c_bigint_30", - "c_decimal_unsigned_30", - "c_decimal_30", - }; - List rows = new ArrayList<>(); BigDecimal bigintValue = new BigDecimal("2844674407371055000"); BigDecimal decimalValue = new BigDecimal("999999999999999999999999999899"); @@ -247,7 +270,7 @@ Pair> initTestData() { String.format("f1_%s", i), String.format("{\"aa\":\"bb_%s\"}", i), String.format("f1_%s", i), - Date.valueOf(LocalDate.now()), + java.sql.Date.valueOf(LocalDate.now()), Timestamp.valueOf(LocalDateTime.now()), new Timestamp(System.currentTimeMillis()), "test".getBytes(), @@ -262,6 +285,15 @@ Pair> initTestData() { }); rows.add(row); } + SeaTunnelRow row = + new SeaTunnelRow( + new Object[] { + null, null, null, null, null, null, null, null, null, null, null, null, + null, null, null, null, null, null, null, null, null, null, null, null, + null, null, null, null, null, null, null, null, null, null, null, null, + null, null, null, null, null, null, null, null, + }); + rows.add(row); return Pair.of(fieldNames, rows); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java index 75bdffbd6ca..07b3576701c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.Assertions; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.OracleContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; @@ -34,11 +35,14 @@ import com.google.common.collect.Lists; import java.math.BigDecimal; -import java.sql.Date; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -60,6 +64,22 @@ public class JdbcOracleIT extends AbstractJdbcIT { private static final List CONFIG_FILE = Lists.newArrayList("/jdbc_oracle_source_to_sink.conf"); + private static final String[] fieldNames = + new String[] { + "VARCHAR_10_COL", + "CHAR_10_COL", + "CLOB_COL", + "NUMBER_3_SF_2_DP", + "INTEGER_COL", + "FLOAT_COL", + "REAL_COL", + "BINARY_FLOAT_COL", + "BINARY_DOUBLE_COL", + "DATE_COL", + "TIMESTAMP_WITH_3_FRAC_SEC_COL", + "TIMESTAMP_WITH_LOCAL_TZ" + }; + private static final String CREATE_SQL = "create table %s\n" + "(\n" @@ -116,7 +136,23 @@ JdbcCase getJdbcCase() { } @Override - void compareResult() {} + void compareResult() { + try (Statement statement = connection.createStatement()) { + String whereStr = + Arrays.stream(fieldNames) + .map(field -> field + " is null") + .collect(java.util.stream.Collectors.joining(" and ")); + ResultSet resultSet = + statement.executeQuery( + String.format( + "select count(1) from %s where %s", + getJdbcCase().getSinkTable(), whereStr)); + resultSet.next(); + Assertions.assertEquals(resultSet.getInt(1), 1); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } @Override String driverUrl() { @@ -125,22 +161,6 @@ String driverUrl() { @Override Pair> initTestData() { - String[] fieldNames = - new String[] { - "VARCHAR_10_COL", - "CHAR_10_COL", - "CLOB_COL", - "NUMBER_3_SF_2_DP", - "INTEGER_COL", - "FLOAT_COL", - "REAL_COL", - "BINARY_FLOAT_COL", - "BINARY_DOUBLE_COL", - "DATE_COL", - "TIMESTAMP_WITH_3_FRAC_SEC_COL", - "TIMESTAMP_WITH_LOCAL_TZ" - }; - List rows = new ArrayList<>(); for (int i = 0; i < 100; i++) { SeaTunnelRow row = @@ -155,13 +175,17 @@ Pair> initTestData() { Float.parseFloat("2.2"), Float.parseFloat("22.2"), Double.parseDouble("2.2"), - Date.valueOf(LocalDate.now()), + java.sql.Date.valueOf(LocalDate.now()), Timestamp.valueOf(LocalDateTime.now()), Timestamp.valueOf(LocalDateTime.now()) }); rows.add(row); } - + rows.add( + new SeaTunnelRow( + new Object[] { + null, null, null, null, null, null, null, null, null, null, null, null + })); return Pair.of(fieldNames, rows); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java index 13adec70084..8d2f9ce790b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java @@ -88,8 +88,8 @@ public class JdbcPostgresIdentifierIT extends TestSuiteBase implements TestResou + " date_col DATE,\n" + " timestamp_col TIMESTAMP,\n" + " bpchar_col BPCHAR(10),\n" - + " age INT NOT null,\n" - + " name VARCHAR(255) NOT null,\n" + + " age INT null,\n" + + " name VARCHAR(255) null,\n" + " point geometry(POINT, 4326),\n" + " linestring geometry(LINESTRING, 4326),\n" + " polygon_colums geometry(POLYGON, 4326),\n" @@ -119,8 +119,8 @@ public class JdbcPostgresIdentifierIT extends TestSuiteBase implements TestResou + " \"DATE_COL\" DATE,\n" + " \"TIMESTAMP_COL\" TIMESTAMP,\n" + " \"BPCHAR_COL\" BPCHAR(10),\n" - + " \"AGE\" int4 NOT NULL,\n" - + " \"NAME\" varchar(255) NOT NULL,\n" + + " \"AGE\" int4 NULL,\n" + + " \"NAME\" varchar(255) NULL,\n" + " \"POINT\" varchar(2000) NULL,\n" + " \"LINESTRING\" varchar(2000) NULL,\n" + " \"POLYGON_COLUMS\" varchar(2000) NULL,\n" @@ -336,6 +336,70 @@ private void initializeJdbcTable() { + " ST_GeographyFromText('POINT(-122.3452 47.5925)')\n" + " )"); } + statement.addBatch( + "INSERT INTO\n" + + " pg_ide_source_table (gid,\n" + + " text_col,\n" + + " varchar_col,\n" + + " char_col,\n" + + " boolean_col,\n" + + " smallint_col,\n" + + " integer_col,\n" + + " bigint_col,\n" + + " decimal_col,\n" + + " numeric_col,\n" + + " real_col,\n" + + " double_precision_col,\n" + + " smallserial_col,\n" + + " serial_col,\n" + + " bigserial_col,\n" + + " date_col,\n" + + " timestamp_col,\n" + + " bpchar_col,\n" + + " age,\n" + + " name,\n" + + " point,\n" + + " linestring,\n" + + " polygon_colums,\n" + + " multipoint,\n" + + " multilinestring,\n" + + " multipolygon,\n" + + " geometrycollection,\n" + + " geog\n" + + " )\n" + + "VALUES\n" + + " (\n" + + " '" + + 11 + + "',\n" + + " NULL,\n" + + " NULL,\n" + + " NULL,\n" + + " NULL,\n" + + " NULL,\n" + + " NULL,\n" + + " NULL,\n" + + " NULL,\n" + + " NULL,\n" + + " NULL,\n" + + " NULL,\n" + + " 1,\n" + + " 100,\n" + + " 10000,\n" + + " NULL,\n" + + " NULL,\n" + + " NULL,\n" + + " NULL,\n" + + " NULL,\n" + + " NULL,\n" + + " NULL,\n" + + " NULL,\n" + + " NULL,\n" + + " NULL,\n" + + " NULL,\n" + + " NULL,\n" + + " NULL\n" + + " )"); statement.executeBatch(); } catch (SQLException e) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf index b91ee9d3177..e627925609e 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink.conf @@ -27,8 +27,13 @@ source { connection_check_timeout_sec = 100 user = "root" password = "Abc!@#135_seatunnel" - - query = "select * from source;" + partition_column = "id" + query = """select id, c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, + c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, + c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, + c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, + c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, + c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30 from source""" } } @@ -42,12 +47,12 @@ sink { user = "root" password = "Abc!@#135_seatunnel" - query = """insert into sink (c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, + query = """insert into sink (id, c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30) - values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf index c393e69cee2..e39629195e5 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel.conf @@ -26,10 +26,14 @@ source { driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "Abc!@#135_seatunnel" - query = "select * from source" - partition_column = "c_decimal_unsigned_30" + partition_column = "id" + query = """select id, c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, + c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, + c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, + c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, + c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, + c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30 from source""" partition_num = 3 - result_table_name = "jdbc" } } @@ -45,12 +49,12 @@ sink { user = "root" password = "Abc!@#135_seatunnel" connection_check_timeout_sec = 100 - query = """insert into sink (c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, + query = """insert into sink (id, c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30) - values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf index 0460ccdf3ba..9924f1d0584 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_parallel_upper_lower.conf @@ -26,11 +26,16 @@ source { driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "Abc!@#135_seatunnel" - query = "select * from source" - partition_column = "c_bigint_30" + query = """select id, c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, + c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, + c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, + c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, + c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, + c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30 from source""" + partition_column = "id" result_table_name = "jdbc" - partition_lower_bound = 2844674407371055160 - partition_upper_bound = 2844674407371055259 + partition_lower_bound = 50 + partition_upper_bound = 102 partition_num = 5 } } @@ -46,12 +51,12 @@ sink { user = "root" password = "Abc!@#135_seatunnel" connection_check_timeout_sec = 100 - query = """insert into sink (c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, + query = """insert into sink (id, c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, c_binary, c_year, c_int_unsigned, c_integer_unsigned,c_bigint_30,c_decimal_unsigned_30,c_decimal_30) - values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);""" } } \ No newline at end of file