Skip to content

Commit

Permalink
[Improvement][Seatunnel-web][Hive-JDBC] Add Hive-JDBC into seatunnel-…
Browse files Browse the repository at this point in the history
…web (#218)
  • Loading branch information
arshadmohammad authored Sep 26, 2024
1 parent ddd25bf commit d110b97
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ public class DatasourceLoadConfig {
classLoaderFactoryName.put(
"JDBC-TIDB",
"org.apache.seatunnel.datasource.plugin.tidb.jdbc.TidbJdbcDataSourceFactory");
classLoaderFactoryName.put(
"JDBC-HIVE",
"org.apache.seatunnel.datasource.plugin.hive.jdbc.HiveJdbcDataSourceFactory");
classLoaderFactoryName.put(
"KAFKA", "org.apache.seatunnel.datasource.plugin.kafka.KafkaDataSourceFactory");
classLoaderFactoryName.put(
Expand Down Expand Up @@ -124,6 +127,7 @@ public class DatasourceLoadConfig {
classLoaderJarName.put("JDBC-STARROCKS", "datasource-jdbc-starrocks-");
classLoaderJarName.put("MONGODB", "datasource-mongodb-");
classLoaderJarName.put("JDBC-DB2", "datasource-jdbc-db2-");
classLoaderJarName.put("JDBC-HIVE", "datasource-jdbc-hive-");
classLoaderJarName.put("FAKESOURCE", "datasource-fakesource-");
classLoaderJarName.put("CONSOLE", "datasource-console-");
}
Expand All @@ -138,6 +142,7 @@ public class DatasourceLoadConfig {
"JDBC-Postgres",
"JDBC-SQLServer",
"JDBC-TiDB",
"JDBC-Hive",
"Kafka",
"MySQL-CDC",
"S3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
<artifactId>datasource-hive</artifactId>

<properties>
<hive.exec.version>3.1.3</hive.exec.version>
<guava.version>24.1-jre</guava.version>
</properties>

Expand All @@ -51,7 +50,7 @@
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.exec.version}</version>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import java.util.Set;

public class HiveJdbcConstants {

public static final String PLUGIN_NAME = "JDBC-Hive";
public static final Set<String> HIVE_SYSTEM_DATABASES =
Sets.newHashSet(
"information_schema", "mysql", "performance_schema", "sys", "test", "hivedb");
Sets.newHashSet("information_schema", "performance_schema", "sys");
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@

import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
Expand All @@ -41,6 +41,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

@Slf4j
public class HiveJdbcDataSourceChannel implements DataSourceChannel {
Expand All @@ -61,15 +62,15 @@ public List<String> getTables(
Map<String, String> requestParams,
String database,
Map<String, String> option) {
return getTables(pluginName, requestParams, database, option);
return getTableNames(requestParams, database);
}

@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
try {
return getDataBaseNames(pluginName, requestParams);
} catch (SQLException e) {
return getDataBaseNames(requestParams);
} catch (SQLException | IOException e) {
log.error("Query Hive databases error, request params is {}", requestParams, e);
throw new DataSourcePluginException("Query Hive databases error,", e);
}
Expand Down Expand Up @@ -104,33 +105,69 @@ public Map<String, List<TableField>> getTableFields(
}

protected boolean checkJdbcConnectivity(Map<String, String> requestParams) {
try (Connection ignored = init(requestParams)) {
try (Connection ignored = getHiveConnection(requestParams)) {
return true;
} catch (Exception e) {
throw new DataSourcePluginException(
"check jdbc connectivity failed, " + e.getMessage(), e);
}
}

protected Connection init(Map<String, String> requestParams) throws SQLException {
protected Connection getHiveConnection(Map<String, String> requestParams)
throws IOException, SQLException {
if (MapUtils.isEmpty(requestParams)) {
throw new DataSourcePluginException(
"Hive jdbc request params is null, please check your config");
}
String url = requestParams.get(HiveJdbcOptionRule.URL.key());
return DriverManager.getConnection(url);
String driverClass =
requestParams.getOrDefault(
HiveJdbcOptionRule.DRIVER.key(), "org.apache.hive.jdbc.HiveDriver");
try {
Class.forName(driverClass);
} catch (ClassNotFoundException e) {
throw new DataSourcePluginException(
"Hive jdbc driver " + driverClass + " not found", e);
}
Properties connProps = new Properties();
boolean isKerberosEnabled =
Boolean.parseBoolean(requestParams.get(HiveJdbcOptionRule.USE_KERBEROS.key()));
if (isKerberosEnabled) {
String krb5ConfPath = requestParams.get(HiveJdbcOptionRule.KRB5_PATH.key());
if (StringUtils.isNotEmpty(krb5ConfPath)) {
System.setProperty("java.security.krb5.conf", krb5ConfPath);
}
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "Kerberos");
UserGroupInformation.setConfiguration(conf);
String principal = requestParams.get(HiveJdbcOptionRule.KERBEROS_PRINCIPAL.key());
connProps.setProperty("principal", principal);
String keytabPath = requestParams.get(HiveJdbcOptionRule.KERBEROS_KEYTAB_PATH.key());
UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
}

String user = requestParams.get(HiveJdbcOptionRule.USER.key());
String password = requestParams.get(HiveJdbcOptionRule.PASSWORD.key());
if (StringUtils.isNotEmpty(user)) {
connProps.setProperty("user", user);
}
if (StringUtils.isNotEmpty(password)) {
connProps.setProperty("password", password);
}

String jdbcUrl = requestParams.get(HiveJdbcOptionRule.URL.key());
return DriverManager.getConnection(jdbcUrl, connProps);
}

protected List<String> getDataBaseNames(String pluginName, Map<String, String> requestParams)
throws SQLException {
protected List<String> getDataBaseNames(Map<String, String> requestParams)
throws SQLException, IOException {
List<String> dbNames = new ArrayList<>();
try (Connection connection = init(requestParams);
Statement statement = connection.createStatement(); ) {
ResultSet re = statement.executeQuery("SHOW DATABASES;");
try (Connection connection = getHiveConnection(requestParams);
Statement statement = connection.createStatement()) {
ResultSet re = statement.executeQuery("SHOW DATABASES");
// filter system databases
while (re.next()) {
String dbName = re.getString("database");
if (StringUtils.isNotBlank(dbName) && isNotSystemDatabase(pluginName, dbName)) {
String dbName = re.getString("database_name");
if (StringUtils.isNotBlank(dbName) && isNotSystemDatabase(dbName)) {
dbNames.add(dbName);
}
}
Expand All @@ -140,25 +177,27 @@ protected List<String> getDataBaseNames(String pluginName, Map<String, String> r

protected List<String> getTableNames(Map<String, String> requestParams, String dbName) {
List<String> tableNames = new ArrayList<>();
try (Connection connection = init(requestParams); ) {
try (Connection connection = getHiveConnection(requestParams)) {
ResultSet resultSet =
connection.getMetaData().getTables(dbName, null, null, new String[] {"TABLE"});
connection
.getMetaData()
.getTables(dbName, dbName, null, new String[] {"TABLE"});
while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) {
tableNames.add(tableName);
}
}
return tableNames;
} catch (SQLException e) {
} catch (SQLException | IOException e) {
throw new DataSourcePluginException("get table names failed", e);
}
}

protected List<TableField> getTableFields(
Map<String, String> requestParams, String dbName, String tableName) {
List<TableField> tableFields = new ArrayList<>();
try (Connection connection = init(requestParams); ) {
try (Connection connection = getHiveConnection(requestParams)) {
DatabaseMetaData metaData = connection.getMetaData();
String primaryKey = getPrimaryKey(metaData, dbName, tableName);
ResultSet resultSet = metaData.getColumns(dbName, null, tableName, null);
Expand All @@ -177,7 +216,7 @@ protected List<TableField> getTableFields(
tableField.setNullable(isNullable);
tableFields.add(tableField);
}
} catch (SQLException e) {
} catch (SQLException | IOException e) {
throw new DataSourcePluginException("get table fields failed", e);
}
return tableFields;
Expand All @@ -186,25 +225,14 @@ protected List<TableField> getTableFields(
private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String tableName)
throws SQLException {
ResultSet primaryKeysInfo = metaData.getPrimaryKeys(dbName, "%", tableName);
while (primaryKeysInfo.next()) {
if (primaryKeysInfo.next()) {
return primaryKeysInfo.getString("COLUMN_NAME");
}
return null;
}

@SuppressWarnings("checkstyle:MagicNumber")
private static boolean checkHostConnectable(String host, int port) {
try (Socket socket = new Socket()) {
socket.connect(new InetSocketAddress(host, port), 1000);
return true;
} catch (IOException e) {
return false;
}
}

private boolean isNotSystemDatabase(String pluginName, String dbName) {
// FIXME,filters system databases
return true;
private boolean isNotSystemDatabase(String dbName) {
return !HiveJdbcConstants.HIVE_SYSTEM_DATABASES.contains(dbName.toLowerCase());
}

private boolean convertToBoolean(Object value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@
public class HiveJdbcDataSourceFactory implements DataSourceFactory {
@Override
public String factoryIdentifier() {
return "Hive-JDBC";
return HiveJdbcConstants.PLUGIN_NAME;
}

@Override
public Set<DataSourcePluginInfo> supportedDataSources() {
DataSourcePluginInfo dataSourcePluginInfo =
DataSourcePluginInfo.builder()
.name("Hive-JDBC")
.name(HiveJdbcConstants.PLUGIN_NAME)
.type(DatasourcePluginTypeEnum.DATABASE.getCode())
.version("1.0.0")
.icon("Hive-JDBC")
.icon(HiveJdbcConstants.PLUGIN_NAME)
.supportVirtualTables(false)
.build();
Set<DataSourcePluginInfo> dataSourceInfos = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,65 @@ public class HiveJdbcOptionRule {

public static final Option<String> URL =
Options.key("url")
.stringType()
.defaultValue("jdbc:hive2://localhost:10000/default")
.withDescription(
"The URL of the JDBC connection. Refer to a case: jdbc:hive2://localhost:10000/default");

public static final Option<String> DRIVER =
Options.key("driver")
.stringType()
.defaultValue("org.apache.hive.jdbc.HiveDriver")
.withDescription(
"The jdbc class name used to connect to the remote data source");

public static final Option<String> USER =
Options.key("user").stringType().noDefaultValue().withDescription("user");

public static final Option<String> PASSWORD =
Options.key("password").stringType().noDefaultValue().withDescription("password");

public static final Option<Boolean> USE_KERBEROS =
Options.key("use_kerberos")
.booleanType()
.defaultValue(false)
.withDescription("Whether to enable Kerberos, default is false.");

public static final Option<String> KERBEROS_PRINCIPAL =
Options.key("kerberos_principal")
.stringType()
.noDefaultValue()
.withDescription(
"When use kerberos, we should set kerberos principal such as 'test_user@xxx'. ");

public static final Option<String> KERBEROS_KEYTAB_PATH =
Options.key("kerberos_keytab_path")
.stringType()
.noDefaultValue()
.withDescription(
"jdbc url, eg:"
+ "jdbc:hive2://localhost:10000/default?useSSL=false&serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8");
"When use kerberos, we should set kerberos principal file path such as '/home/test/test_user.keytab'. ");

public static final Option<String> KRB5_PATH =
Options.key("krb5_path")
.stringType()
.defaultValue("/etc/krb5.conf")
.withDescription(
"When use kerberos, we should set krb5 path file path such as '/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf");

public static OptionRule optionRule() {
return OptionRule.builder().required(URL).build();
return OptionRule.builder()
.required(URL)
.required(DRIVER)
.optional(USER)
.optional(PASSWORD)
.optional(USE_KERBEROS)
.optional(KERBEROS_PRINCIPAL)
.optional(KERBEROS_KEYTAB_PATH)
.optional(KRB5_PATH)
.build();
}

public static OptionRule metadataRule() {
// todo
return OptionRule.builder().build();
}
}
4 changes: 4 additions & 0 deletions seatunnel-datasource/seatunnel-datasource-plugins/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,9 @@
<module>datasource-fakesource</module>
<module>datasource-console</module>
</modules>
<properties>
<hive.version>3.1.3</hive.version>
<hadoop.version>3.1.0</hadoop.version>
</properties>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -85,7 +86,7 @@ public static class SeaTunnelDataTypeConvertor

@Override
public SeaTunnelDataType<?> toSeaTunnelType(String field, String connectorDataType) {
return DATA_TYPE_MAP.get(connectorDataType).getRawType();
return DATA_TYPE_MAP.get(connectorDataType.toLowerCase(Locale.ROOT)).getRawType();
}

@Override
Expand Down
Loading

0 comments on commit d110b97

Please sign in to comment.