From 02be1e08ff2ab8923b679d0c95ca220fa9c6f9b8 Mon Sep 17 00:00:00 2001 From: Anqi Date: Sun, 7 Apr 2024 16:44:22 +0800 Subject: [PATCH] support auth for reader (#145) --- .../connector/NebulaSparkReaderExample.scala | 2 ++ .../nebula/connector/NebulaConfig.scala | 32 ++++++++++++++++--- .../connector/reader/NebulaReader.scala | 2 ++ .../com/vesoft/nebula/connector/package.scala | 18 +++++++++++ .../com/vesoft/nebula/connector/package.scala | 18 +++++++++++ .../com/vesoft/nebula/connector/package.scala | 12 +++++++ 6 files changed, 79 insertions(+), 5 deletions(-) diff --git a/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala b/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala index eb52417..2845a39 100644 --- a/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala +++ b/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala @@ -48,6 +48,8 @@ object NebulaSparkReaderExample { .build() val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig .builder() + .withUser("root") // optional config + .withPasswd("nebula") // optional config .withSpace("test") .withLabel("person") .withNoColumn(false) diff --git a/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala b/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala index 65a1c9b..556a8ce 100644 --- a/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala +++ b/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala @@ -769,6 +769,8 @@ class ReadNebulaConfig extends Serializable { var getPartitionNum: Int = _ var getLimit: Int = _ var getNgql: String = _ + var getUser: String = _ + var getPasswd: String = _ // todo add filter def this(space: String, @@ -776,7 +778,9 @@ class ReadNebulaConfig extends Serializable { returnCols: List[String], noColumn: Boolean, partitionNum: Int, - limit: Int) = { + limit: Int, + user: String, + passwd: String) = { this() this.getSpace = space this.getLabel = label @@ -784,6 +788,8 @@ class ReadNebulaConfig extends Serializable { this.getNoColumn = noColumn this.getPartitionNum = partitionNum this.getLimit = limit + this.getUser = user + this.getPasswd = passwd } def this(space: String, @@ -791,7 +797,9 @@ class ReadNebulaConfig extends Serializable { returnCols: List[String], noColumn: Boolean, ngql: String, - limit: Int) = { + limit: Int, + user: String, + passwd: String) = { this() this.getNgql = ngql this.getSpace = space @@ -800,6 +808,8 @@ class ReadNebulaConfig extends Serializable { this.getNoColumn = noColumn this.getLimit = limit this.getPartitionNum = 1 + this.getUser = user + this.getPasswd = passwd } } @@ -817,6 +827,8 @@ object ReadNebulaConfig { var partitionNum: Int = 100 var limit: Int = 1000 var ngql: String = _ + var user: String = _ + var passwd: String = _ def withSpace(space: String): ReadConfigBuilder = { this.space = space @@ -864,12 +876,22 @@ object ReadNebulaConfig { this } + def withUser(user: String): ReadConfigBuilder = { + this.user = user + this + } + + def withPasswd(passwd: String): ReadConfigBuilder = { + this.passwd = passwd + this + } + def build(): ReadNebulaConfig = { check() - if (ngql != null && !ngql.isEmpty) { - new ReadNebulaConfig(space, label, returnCols.toList, noColumn, ngql, limit) + if (ngql != null && ngql.nonEmpty) { + new ReadNebulaConfig(space, label, returnCols.toList, noColumn, ngql, limit, user, passwd) } else { - new ReadNebulaConfig(space, label, returnCols.toList, noColumn, partitionNum, limit) + new ReadNebulaConfig(space, label, returnCols.toList, noColumn, partitionNum, limit, user, passwd) } } diff --git a/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/reader/NebulaReader.scala b/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/reader/NebulaReader.scala index c509d7e..31e6ef6 100644 --- a/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/reader/NebulaReader.scala +++ b/nebula-spark-common/src/main/scala/com/vesoft/nebula/connector/reader/NebulaReader.scala @@ -98,6 +98,8 @@ trait NebulaReader { } else { this.storageClient = new StorageClient(address.asJava, nebulaOptions.timeout) } + storageClient.setUser(nebulaOptions.user) + storageClient.setPassword(nebulaOptions.passwd) if (!storageClient.connect()) { throw new GraphConnectException("storage connect failed.") diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala index b226e54..4c221b5 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala @@ -61,6 +61,12 @@ package object connector { .option(NebulaOptions.ENABLE_META_SSL, connectionConfig.getEnableMetaSSL) .option(NebulaOptions.ENABLE_STORAGE_SSL, connectionConfig.getEnableStorageSSL) + if (readConfig.getUser != null && readConfig.getPasswd != null) { + dfReader + .option(NebulaOptions.USER_NAME, readConfig.getUser) + .option(NebulaOptions.PASSWD, readConfig.getPasswd) + } + if (connectionConfig.getEnableStorageSSL || connectionConfig.getEnableMetaSSL) { dfReader.option(NebulaOptions.SSL_SIGN_TYPE, connectionConfig.getSignType) SSLSignType.withName(connectionConfig.getSignType) match { @@ -99,6 +105,12 @@ package object connector { .option(NebulaOptions.ENABLE_META_SSL, connectionConfig.getEnableMetaSSL) .option(NebulaOptions.ENABLE_STORAGE_SSL, connectionConfig.getEnableStorageSSL) + if (readConfig.getUser != null && readConfig.getPasswd != null) { + dfReader + .option(NebulaOptions.USER_NAME, readConfig.getUser) + .option(NebulaOptions.PASSWD, readConfig.getPasswd) + } + if (connectionConfig.getEnableStorageSSL || connectionConfig.getEnableMetaSSL) { dfReader.option(NebulaOptions.SSL_SIGN_TYPE, connectionConfig.getSignType) SSLSignType.withName(connectionConfig.getSignType) match { @@ -139,6 +151,12 @@ package object connector { .option(NebulaOptions.ENABLE_META_SSL, connectionConfig.getEnableMetaSSL) .option(NebulaOptions.ENABLE_STORAGE_SSL, connectionConfig.getEnableStorageSSL) + if (readConfig.getUser != null && readConfig.getPasswd != null) { + dfReader + .option(NebulaOptions.USER_NAME, readConfig.getUser) + .option(NebulaOptions.PASSWD, readConfig.getPasswd) + } + if (connectionConfig.getEnableStorageSSL || connectionConfig.getEnableMetaSSL) { dfReader.option(NebulaOptions.SSL_SIGN_TYPE, connectionConfig.getSignType) SSLSignType.withName(connectionConfig.getSignType) match { diff --git a/nebula-spark-connector_2.2/src/main/scala/com/vesoft/nebula/connector/package.scala b/nebula-spark-connector_2.2/src/main/scala/com/vesoft/nebula/connector/package.scala index 1abd2e1..6872969 100644 --- a/nebula-spark-connector_2.2/src/main/scala/com/vesoft/nebula/connector/package.scala +++ b/nebula-spark-connector_2.2/src/main/scala/com/vesoft/nebula/connector/package.scala @@ -124,6 +124,12 @@ package object connector { .option(NebulaOptions.ENABLE_META_SSL, connectionConfig.getEnableMetaSSL) .option(NebulaOptions.ENABLE_STORAGE_SSL, connectionConfig.getEnableStorageSSL) + if (readConfig.getUser != null && readConfig.getPasswd != null) { + dfReader + .option(NebulaOptions.USER_NAME, readConfig.getUser) + .option(NebulaOptions.PASSWD, readConfig.getPasswd) + } + if (connectionConfig.getEnableStorageSSL || connectionConfig.getEnableMetaSSL) { dfReader.option(NebulaOptions.SSL_SIGN_TYPE, connectionConfig.getSignType) SSLSignType.withName(connectionConfig.getSignType) match { @@ -162,6 +168,12 @@ package object connector { .option(NebulaOptions.ENABLE_META_SSL, connectionConfig.getEnableMetaSSL) .option(NebulaOptions.ENABLE_STORAGE_SSL, connectionConfig.getEnableStorageSSL) + if (readConfig.getUser != null && readConfig.getPasswd != null) { + dfReader + .option(NebulaOptions.USER_NAME, readConfig.getUser) + .option(NebulaOptions.PASSWD, readConfig.getPasswd) + } + if (connectionConfig.getEnableStorageSSL || connectionConfig.getEnableMetaSSL) { dfReader.option(NebulaOptions.SSL_SIGN_TYPE, connectionConfig.getSignType) SSLSignType.withName(connectionConfig.getSignType) match { @@ -201,6 +213,12 @@ package object connector { .option(NebulaOptions.ENABLE_META_SSL, connectionConfig.getEnableMetaSSL) .option(NebulaOptions.ENABLE_STORAGE_SSL, connectionConfig.getEnableStorageSSL) + if (readConfig.getUser != null && readConfig.getPasswd != null) { + dfReader + .option(NebulaOptions.USER_NAME, readConfig.getUser) + .option(NebulaOptions.PASSWD, readConfig.getPasswd) + } + if (connectionConfig.getEnableStorageSSL || connectionConfig.getEnableMetaSSL) { dfReader.option(NebulaOptions.SSL_SIGN_TYPE, connectionConfig.getSignType) SSLSignType.withName(connectionConfig.getSignType) match { diff --git a/nebula-spark-connector_3.0/src/main/scala/com/vesoft/nebula/connector/package.scala b/nebula-spark-connector_3.0/src/main/scala/com/vesoft/nebula/connector/package.scala index 3a9cdf2..7564525 100644 --- a/nebula-spark-connector_3.0/src/main/scala/com/vesoft/nebula/connector/package.scala +++ b/nebula-spark-connector_3.0/src/main/scala/com/vesoft/nebula/connector/package.scala @@ -61,6 +61,12 @@ package object connector { .option(NebulaOptions.ENABLE_META_SSL, connectionConfig.getEnableMetaSSL) .option(NebulaOptions.ENABLE_STORAGE_SSL, connectionConfig.getEnableStorageSSL) + if (readConfig.getUser != null && readConfig.getPasswd != null) { + dfReader + .option(NebulaOptions.USER_NAME, readConfig.getUser) + .option(NebulaOptions.PASSWD, readConfig.getPasswd) + } + if (connectionConfig.getEnableStorageSSL || connectionConfig.getEnableMetaSSL) { dfReader.option(NebulaOptions.SSL_SIGN_TYPE, connectionConfig.getSignType) SSLSignType.withName(connectionConfig.getSignType) match { @@ -99,6 +105,12 @@ package object connector { .option(NebulaOptions.ENABLE_META_SSL, connectionConfig.getEnableMetaSSL) .option(NebulaOptions.ENABLE_STORAGE_SSL, connectionConfig.getEnableStorageSSL) + if (readConfig.getUser != null && readConfig.getPasswd != null) { + dfReader + .option(NebulaOptions.USER_NAME, readConfig.getUser) + .option(NebulaOptions.PASSWD, readConfig.getPasswd) + } + if (connectionConfig.getEnableStorageSSL || connectionConfig.getEnableMetaSSL) { dfReader.option(NebulaOptions.SSL_SIGN_TYPE, connectionConfig.getSignType) SSLSignType.withName(connectionConfig.getSignType) match {