Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bugfix] [connector-jdbc] Fix when type is int and null value will always get 0 instead of null #4812

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,46 +50,47 @@ public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws S
int resultSetIndex = fieldIndex + 1;
switch (seaTunnelDataType.getSqlType()) {
case STRING:
fields[fieldIndex] = rs.getString(resultSetIndex);
fields[fieldIndex] = rs.getObject(resultSetIndex, String.class);
break;
case BOOLEAN:
fields[fieldIndex] = rs.getBoolean(resultSetIndex);
fields[fieldIndex] = rs.getObject(resultSetIndex, Boolean.class);
break;
case TINYINT:
fields[fieldIndex] = rs.getByte(resultSetIndex);
fields[fieldIndex] = rs.getObject(resultSetIndex, Byte.class);
break;
case SMALLINT:
fields[fieldIndex] = rs.getShort(resultSetIndex);
fields[fieldIndex] = rs.getObject(resultSetIndex, Short.class);
break;
case INT:
fields[fieldIndex] = rs.getInt(resultSetIndex);
fields[fieldIndex] = rs.getObject(resultSetIndex, Integer.class);
break;
case BIGINT:
fields[fieldIndex] = rs.getLong(resultSetIndex);
fields[fieldIndex] = rs.getObject(resultSetIndex, Long.class);
break;
case FLOAT:
fields[fieldIndex] = rs.getFloat(resultSetIndex);
fields[fieldIndex] = rs.getObject(resultSetIndex, Float.class);
break;
case DOUBLE:
fields[fieldIndex] = rs.getDouble(resultSetIndex);
fields[fieldIndex] = rs.getObject(resultSetIndex, Double.class);
break;
case DECIMAL:
fields[fieldIndex] = rs.getBigDecimal(resultSetIndex);
fields[fieldIndex] = rs.getObject(resultSetIndex, BigDecimal.class);
break;
case DATE:
Date sqlDate = rs.getDate(resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlDate).map(e -> e.toLocalDate()).orElse(null);
Optional.ofNullable(rs.getObject(resultSetIndex, Date.class))
.map(e -> e.toLocalDate())
.orElse(null);
break;
case TIME:
Time sqlTime = rs.getTime(resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTime).map(e -> e.toLocalTime()).orElse(null);
Optional.ofNullable(rs.getObject(resultSetIndex, Time.class))
.map(e -> e.toLocalTime())
.orElse(null);
break;
case TIMESTAMP:
Timestamp sqlTimestamp = rs.getTimestamp(resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTimestamp)
Optional.ofNullable(rs.getObject(resultSetIndex, Timestamp.class))
.map(e -> e.toLocalDateTime())
.orElse(null);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -59,47 +60,48 @@ public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws S
? null
: rs.getObject(resultSetIndex).toString();
} else {
fields[fieldIndex] = rs.getString(resultSetIndex);
fields[fieldIndex] = rs.getObject(resultSetIndex, String.class);
}
break;
case BOOLEAN:
fields[fieldIndex] = rs.getBoolean(resultSetIndex);
fields[fieldIndex] = rs.getObject(resultSetIndex, Boolean.class);
break;
case TINYINT:
fields[fieldIndex] = rs.getByte(resultSetIndex);
fields[fieldIndex] = rs.getObject(resultSetIndex, Byte.class);
break;
case SMALLINT:
fields[fieldIndex] = rs.getShort(resultSetIndex);
fields[fieldIndex] = rs.getObject(resultSetIndex, Short.class);
break;
case INT:
fields[fieldIndex] = rs.getInt(resultSetIndex);
fields[fieldIndex] = rs.getObject(resultSetIndex, Integer.class);
break;
case BIGINT:
fields[fieldIndex] = rs.getLong(resultSetIndex);
fields[fieldIndex] = rs.getObject(resultSetIndex, Long.class);
break;
case FLOAT:
fields[fieldIndex] = rs.getFloat(resultSetIndex);
fields[fieldIndex] = rs.getObject(resultSetIndex, Float.class);
break;
case DOUBLE:
fields[fieldIndex] = rs.getDouble(resultSetIndex);
fields[fieldIndex] = rs.getObject(resultSetIndex, Double.class);
break;
case DECIMAL:
fields[fieldIndex] = rs.getBigDecimal(resultSetIndex);
fields[fieldIndex] = rs.getObject(resultSetIndex, BigDecimal.class);
break;
case DATE:
Date sqlDate = rs.getDate(resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlDate).map(e -> e.toLocalDate()).orElse(null);
Optional.ofNullable(rs.getObject(resultSetIndex, Date.class))
.map(e -> e.toLocalDate())
.orElse(null);
break;
case TIME:
Time sqlTime = rs.getTime(resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTime).map(e -> e.toLocalTime()).orElse(null);
Optional.ofNullable(rs.getObject(resultSetIndex, Time.class))
.map(e -> e.toLocalTime())
.orElse(null);
break;
case TIMESTAMP:
Timestamp sqlTimestamp = rs.getTimestamp(resultSetIndex);
fields[fieldIndex] =
Optional.ofNullable(sqlTimestamp)
Optional.ofNullable(rs.getObject(resultSetIndex, Timestamp.class))
.map(e -> e.toLocalDateTime())
.orElse(null);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,9 @@ public void testJdbcDb(TestContainer container)
for (String configFile : configFiles) {
Container.ExecResult execResult = container.executeJob(configFile);
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
compareResult();
clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(), jdbcCase.getSinkTable());
}

compareResult();
clearTable(jdbcCase.getDatabase(), jdbcCase.getSchema(), jdbcCase.getSinkTable());
}

protected void initCatalog() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public enum JdbcITErrorCode implements SeaTunnelErrorCode {
CREATE_TABLE_FAILED("JDBC-IT-02", "Fail to create table."),
INSERT_DATA_FAILED("JDBC-IT-03", "Fail to inert data."),
DRIVER_NOT_FOUND("JDBC-IT-04", "Can not get the driver."),
QUERY_TABLE_FAILED("JDBC-IT-05", "Fail to query table."),
;

private final String code;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.commons.lang3.tuple.Pair;

import org.junit.jupiter.api.Assertions;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
Expand All @@ -34,7 +35,9 @@
import com.google.common.collect.Lists;

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -67,6 +70,7 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
private static final String CREATE_SQL =
"CREATE TABLE IF NOT EXISTS %s\n"
+ "(\n"
+ " `id` bigint(18) NOT NULL AUTO_INCREMENT,\n"
+ " `c_bit_1` bit(1) DEFAULT NULL,\n"
+ " `c_bit_8` bit(8) DEFAULT NULL,\n"
+ " `c_bit_16` bit(16) DEFAULT NULL,\n"
Expand Down Expand Up @@ -110,9 +114,58 @@ public class JdbcMysqlIT extends AbstractJdbcIT {
+ " `c_integer_unsigned` int(10) unsigned DEFAULT NULL,\n"
+ " `c_bigint_30` BIGINT(40) unsigned DEFAULT NULL,\n"
+ " `c_decimal_unsigned_30` DECIMAL(30) unsigned DEFAULT NULL,\n"
+ " `c_decimal_30` DECIMAL(30) DEFAULT NULL\n"
+ " `c_decimal_30` DECIMAL(30) DEFAULT NULL,\n"
+ " PRIMARY KEY(`id`)"
+ ");";

private static final String[] fieldNames =
new String[] {
"c_bit_1",
"c_bit_8",
"c_bit_16",
"c_bit_32",
"c_bit_64",
"c_boolean",
"c_tinyint",
"c_tinyint_unsigned",
"c_smallint",
"c_smallint_unsigned",
"c_mediumint",
"c_mediumint_unsigned",
"c_int",
"c_integer",
"c_year",
"c_int_unsigned",
"c_integer_unsigned",
"c_bigint",
"c_bigint_unsigned",
"c_decimal",
"c_decimal_unsigned",
"c_float",
"c_float_unsigned",
"c_double",
"c_double_unsigned",
"c_char",
"c_tinytext",
"c_mediumtext",
"c_text",
"c_varchar",
"c_json",
"c_longtext",
"c_date",
"c_datetime",
"c_timestamp",
"c_tinyblob",
"c_mediumblob",
"c_blob",
"c_longblob",
"c_varbinary",
"c_binary",
"c_bigint_30",
"c_decimal_unsigned_30",
"c_decimal_30",
};

@Override
JdbcCase getJdbcCase() {
Map<String, String> containerEnv = new HashMap<>();
Expand Down Expand Up @@ -147,7 +200,25 @@ JdbcCase getJdbcCase() {
}

@Override
void compareResult() {}
void compareResult() {
try (Statement statement = connection.createStatement()) {
ResultSet resultSet =
statement.executeQuery(
String.format(
"select count(1) from %s.%s where c_bit_1 is null and c_bit_8 is null and c_bit_16 is null and c_bit_32"
+ " is null and c_bit_64 is null and c_boolean is null and c_tinyint is null and c_tinyint_unsigned is null and c_smallint is null and c_smallint_unsigned is null "
+ "and c_mediumint is null and c_mediumint_unsigned is null and c_int is null and c_integer is null and c_bigint is null and c_bigint_unsigned is null and c_decimal is null "
+ "and c_decimal_unsigned is null and c_float is null and c_float_unsigned is null and c_double is null and c_double_unsigned is null and c_char is null and c_tinytext is null "
+ "and c_mediumtext is null and c_text is null and c_varchar is null and c_json is null and c_longtext is null and c_date is null and c_datetime is null and c_timestamp is null "
+ "and c_tinyblob is null and c_mediumblob is null and c_blob is null and c_longblob is null and c_varbinary is null and c_binary is null and c_year is null "
+ "and c_int_unsigned is null and c_integer_unsigned is null and c_bigint_30 is null and c_decimal_unsigned_30 is null and c_decimal_30 is null",
getJdbcCase().getDatabase(), getJdbcCase().getSinkTable()));
resultSet.next();
Assertions.assertEquals(resultSet.getInt(1), 1);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

@Override
String driverUrl() {
Expand All @@ -156,54 +227,6 @@ String driverUrl() {

@Override
Pair<String[], List<SeaTunnelRow>> initTestData() {
String[] fieldNames =
new String[] {
"c_bit_1",
"c_bit_8",
"c_bit_16",
"c_bit_32",
"c_bit_64",
"c_boolean",
"c_tinyint",
"c_tinyint_unsigned",
"c_smallint",
"c_smallint_unsigned",
"c_mediumint",
"c_mediumint_unsigned",
"c_int",
"c_integer",
"c_year",
"c_int_unsigned",
"c_integer_unsigned",
"c_bigint",
"c_bigint_unsigned",
"c_decimal",
"c_decimal_unsigned",
"c_float",
"c_float_unsigned",
"c_double",
"c_double_unsigned",
"c_char",
"c_tinytext",
"c_mediumtext",
"c_text",
"c_varchar",
"c_json",
"c_longtext",
"c_date",
"c_datetime",
"c_timestamp",
"c_tinyblob",
"c_mediumblob",
"c_blob",
"c_longblob",
"c_varbinary",
"c_binary",
"c_bigint_30",
"c_decimal_unsigned_30",
"c_decimal_30",
};

List<SeaTunnelRow> rows = new ArrayList<>();
BigDecimal bigintValue = new BigDecimal("2844674407371055000");
BigDecimal decimalValue = new BigDecimal("999999999999999999999999999899");
Expand Down Expand Up @@ -247,7 +270,7 @@ Pair<String[], List<SeaTunnelRow>> initTestData() {
String.format("f1_%s", i),
String.format("{\"aa\":\"bb_%s\"}", i),
String.format("f1_%s", i),
Date.valueOf(LocalDate.now()),
java.sql.Date.valueOf(LocalDate.now()),
Timestamp.valueOf(LocalDateTime.now()),
new Timestamp(System.currentTimeMillis()),
"test".getBytes(),
Expand All @@ -262,6 +285,15 @@ Pair<String[], List<SeaTunnelRow>> initTestData() {
});
rows.add(row);
}
SeaTunnelRow row =
new SeaTunnelRow(
new Object[] {
null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null,
});
rows.add(row);

return Pair.of(fieldNames, rows);
}
Expand Down
Loading