Skip to content

Commit 2b92b44

Browse files
committed
enable support for custom session configurations
Fix the description of "Read Change Data Feed from a Table"
1 parent 3481ef2 commit 2b92b44

File tree

10 files changed

+326
-131
lines changed

10 files changed

+326
-131
lines changed

client/src/main/scala/io/delta/sharing/client/DeltaSharingFileSystem.scala

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,23 @@
1616

1717
package io.delta.sharing.client
1818

19+
import java.io.File
1920
import java.net.{URI, URLDecoder, URLEncoder}
2021
import java.util.concurrent.TimeUnit
2122

2223
import org.apache.hadoop.fs._
2324
import org.apache.hadoop.fs.permission.FsPermission
2425
import org.apache.hadoop.util.Progressable
2526
import org.apache.http.{HttpClientConnection, HttpHost, HttpRequest, HttpResponse}
26-
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
2727
import org.apache.http.client.config.RequestConfig
2828
import org.apache.http.client.utils.URIBuilder
2929
import org.apache.http.conn.routing.HttpRoute
30-
import org.apache.http.impl.client.{BasicCredentialsProvider, HttpClientBuilder, RequestWrapper}
30+
import org.apache.http.conn.ssl.NoopHostnameVerifier
31+
import org.apache.http.conn.ssl.TrustSelfSignedStrategy
32+
import org.apache.http.impl.client.{CloseableHttpClient, HttpClientBuilder, RequestWrapper}
3133
import org.apache.http.impl.conn.{DefaultRoutePlanner, DefaultSchemePortResolver}
3234
import org.apache.http.protocol.{HttpContext, HttpRequestExecutor}
35+
import org.apache.http.ssl.SSLContextBuilder
3336
import org.apache.spark.SparkEnv
3437
import org.apache.spark.delta.sharing.{PreSignedUrlCache, PreSignedUrlFetcher}
3538
import org.apache.spark.internal.Logging
@@ -44,18 +47,20 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem with Logging {
4447

4548
lazy private val numRetries = ConfUtils.numRetries(getConf)
4649
lazy private val maxRetryDurationMillis = ConfUtils.maxRetryDurationMillis(getConf)
47-
lazy private val timeoutInSeconds = ConfUtils.timeoutInSeconds(getConf)
50+
lazy private val timeoutInMillis = ConfUtils.getTimeoutInMillis(getConf)
4851
lazy private val httpClient = createHttpClient()
4952

50-
private[sharing] def createHttpClient() = {
53+
private[sharing] def createHttpClient(): CloseableHttpClient = {
5154
val proxyConfigOpt = ConfUtils.getProxyConfig(getConf)
5255
val maxConnections = ConfUtils.maxConnections(getConf)
56+
val customHeadersOpt = ConfUtils.getCustomHeaders(getConf)
57+
5358
val config = RequestConfig.custom()
54-
.setConnectTimeout(timeoutInSeconds * 1000)
55-
.setConnectionRequestTimeout(timeoutInSeconds * 1000)
56-
.setSocketTimeout(timeoutInSeconds * 1000).build()
59+
.setConnectTimeout(timeoutInMillis)
60+
.setConnectionRequestTimeout(timeoutInMillis)
61+
.setSocketTimeout(timeoutInMillis).build()
5762

58-
logDebug(s"Creating delta sharing httpClient with timeoutInSeconds: $timeoutInSeconds.")
63+
logDebug(s"Creating delta sharing httpClient with timeoutInMillis: $timeoutInMillis.")
5964
val clientBuilder = HttpClientBuilder.create()
6065
.setMaxConnTotal(maxConnections)
6166
.setMaxConnPerRoute(maxConnections)
@@ -66,10 +71,15 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem with Logging {
6671

6772
// Set proxy if provided.
6873
proxyConfigOpt.foreach { proxyConfig =>
69-
7074
val proxy = new HttpHost(proxyConfig.host, proxyConfig.port)
7175
clientBuilder.setProxy(proxy)
7276

77+
if (proxyConfig.authToken.nonEmpty) {
78+
clientBuilder.addInterceptorFirst((request: HttpRequest, _: HttpContext) => {
79+
request.addHeader("Proxy-Authorization", s"Bearer ${proxyConfig.authToken}")
80+
})
81+
}
82+
7383
val neverUseHttps = ConfUtils.getNeverUseHttps(getConf)
7484
if (neverUseHttps) {
7585
val httpRequestDowngradeExecutor = new HttpRequestExecutor {
@@ -94,6 +104,7 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem with Logging {
94104
}
95105
clientBuilder.setRequestExecutor(httpRequestDowngradeExecutor)
96106
}
107+
97108
if (proxyConfig.noProxyHosts.nonEmpty || neverUseHttps) {
98109
val routePlanner = new DefaultRoutePlanner(DefaultSchemePortResolver.INSTANCE) {
99110
override def determineRoute(target: HttpHost,
@@ -110,6 +121,28 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem with Logging {
110121
}
111122
clientBuilder.setRoutePlanner(routePlanner)
112123
}
124+
125+
if (proxyConfig.sslTrustAll) {
126+
clientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
127+
clientBuilder.setSSLContext(
128+
new SSLContextBuilder()
129+
.loadTrustMaterial(null, new TrustSelfSignedStrategy)
130+
.build()
131+
)
132+
} else if (proxyConfig.caCertPath.nonEmpty) {
133+
clientBuilder.setSSLContext(
134+
new SSLContextBuilder()
135+
.loadTrustMaterial(new File(proxyConfig.caCertPath.getOrElse("")), null)
136+
.build()
137+
)
138+
}
139+
140+
customHeadersOpt.foreach { headers =>
141+
ConfUtils.validateCustomHeaders(headers)
142+
clientBuilder.addInterceptorFirst((request: HttpRequest, _: HttpContext) => {
143+
headers.foreach { case (key, value) => request.addHeader(key, value) }
144+
})
145+
}
113146
}
114147
clientBuilder.build()
115148
}
@@ -122,7 +155,7 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem with Logging {
122155

123156
override def getScheme: String = SCHEME
124157

125-
override def getUri(): URI = URI.create(s"$SCHEME:///")
158+
override def getUri: URI = URI.create(s"$SCHEME:///")
126159

127160
// open a file path with the format below:
128161
// ```
@@ -204,7 +237,7 @@ private[sharing] class DeltaSharingFileSystem extends FileSystem with Logging {
204237

205238
private[sharing] object DeltaSharingFileSystem {
206239

207-
val SCHEME = "delta-sharing"
240+
private val SCHEME = "delta-sharing"
208241

209242
case class DeltaSharingPath(tablePath: String, fileId: String, fileSize: Long) {
210243

client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
package io.delta.sharing.client.util
1818

19+
import com.fasterxml.jackson.databind.ObjectMapper
1920
import java.util.concurrent.TimeUnit
20-
2121
import org.apache.hadoop.conf.Configuration
2222
import org.apache.spark.network.util.JavaUtils
2323
import org.apache.spark.sql.internal.SQLConf
@@ -81,6 +81,10 @@ object ConfUtils {
8181
val PROXY_PORT = "spark.delta.sharing.network.proxyPort"
8282
val NO_PROXY_HOSTS = "spark.delta.sharing.network.noProxyHosts"
8383

84+
val CUSTOM_HEADERS = "spark.delta.sharing.network.customHeaders"
85+
val PROXY_AUTH_TOKEN = "spark.delta.sharing.network.proxyAuthToken"
86+
val CA_CERT_PATH = "spark.delta.sharing.network.caCertPath"
87+
8488
val OAUTH_RETRIES_CONF = "spark.delta.sharing.oauth.tokenExchangeMaxRetries"
8589
val OAUTH_RETRIES_DEFAULT = 5
8690

@@ -98,18 +102,55 @@ object ConfUtils {
98102
def getProxyConfig(conf: Configuration): Option[ProxyConfig] = {
99103
val proxyHost = conf.get(PROXY_HOST, null)
100104
val proxyPortAsString = conf.get(PROXY_PORT, null)
105+
val noProxyList = conf.getTrimmedStrings(NO_PROXY_HOSTS).toSeq
106+
val authToken = Option(conf.get(PROXY_AUTH_TOKEN, null))
107+
val caCertPath = Option(conf.get(CA_CERT_PATH, null))
108+
val sslTrustAll = conf.getBoolean(SSL_TRUST_ALL_CONF, SSL_TRUST_ALL_DEFAULT.toBoolean)
101109

102110
if (proxyHost == null && proxyPortAsString == null) {
103111
return None
104112
}
105113

106114
validateNonEmpty(proxyHost, PROXY_HOST)
107115
validateNonEmpty(proxyPortAsString, PROXY_PORT)
116+
validateNonEmpty(sslTrustAll.toString, SSL_TRUST_ALL_CONF)
117+
108118
val proxyPort = proxyPortAsString.toInt
109119
validatePortNumber(proxyPort, PROXY_PORT)
110120

111-
val noProxyList = conf.getTrimmedStrings(NO_PROXY_HOSTS).toSeq
112-
Some(ProxyConfig(proxyHost, proxyPort, noProxyHosts = noProxyList))
121+
Some(ProxyConfig(
122+
host = proxyHost,
123+
port = proxyPort,
124+
noProxyHosts = noProxyList,
125+
authToken = authToken,
126+
caCertPath = caCertPath,
127+
sslTrustAll = sslTrustAll
128+
))
129+
}
130+
131+
def getCustomHeaders(conf: Configuration): Option[Map[String, String]] = {
132+
val headersString = conf.get(CUSTOM_HEADERS, null)
133+
if (headersString != null && headersString.nonEmpty) {
134+
val mapper = new ObjectMapper()
135+
val headers = mapper.readValue(headersString, classOf[Map[String, String]])
136+
Some(headers)
137+
} else {
138+
None
139+
}
140+
}
141+
142+
def validateCustomHeaders(headers: Map[String, String]): Unit = {
143+
headers.foreach { case (key, value) =>
144+
require(key != null && key.nonEmpty, "Custom header name must not be null or empty")
145+
require(value != null, s"Custom header value for '$key' must not be null")
146+
}
147+
}
148+
149+
def getTimeoutInMillis(conf: Configuration): Int = {
150+
val timeoutStr = conf.get(TIMEOUT_CONF, TIMEOUT_DEFAULT)
151+
val timeoutMillis = JavaUtils.timeStringAs(timeoutStr, TimeUnit.MILLISECONDS)
152+
validateNonNeg(timeoutMillis, TIMEOUT_CONF)
153+
timeoutMillis.toInt
113154
}
114155

115156
def getNeverUseHttps(conf: Configuration): Boolean = {
@@ -325,7 +366,10 @@ object ConfUtils {
325366
}
326367

327368
case class ProxyConfig(host: String,
328-
port: Int,
329-
noProxyHosts: Seq[String] = Seq.empty
369+
port: Int,
370+
noProxyHosts: Seq[String] = Seq.empty,
371+
authToken: Option[String] = None,
372+
caCertPath: Option[String] = None,
373+
sslTrustAll: Boolean = false
330374
)
331375
}

client/src/test/scala/io/delta/sharing/client/DeltaSharingFileSystemSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,13 @@ class DeltaSharingFileSystemSuite extends SparkFunSuite {
9292
proxyServer.initialize()
9393

9494
try {
95-
9695
// Create a ProxyConfig with the host and port of the local proxy server.
9796
val conf = new Configuration
9897
conf.set(ConfUtils.PROXY_HOST, proxyServer.getHost())
9998
conf.set(ConfUtils.PROXY_PORT, proxyServer.getPort().toString)
99+
conf.set(ConfUtils.PROXY_AUTH_TOKEN, "testAuthToken")
100+
conf.set(ConfUtils.CA_CERT_PATH, "/path/to/ca_cert.pem")
101+
conf.set(ConfUtils.SSL_TRUST_ALL_CONF, "true")
100102

101103
// Configure the httpClient to use the ProxyConfig.
102104
val fs = new DeltaSharingFileSystem() {

client/src/test/scala/io/delta/sharing/client/util/ConfUtilsSuite.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,19 @@ class ConfUtilsSuite extends SparkFunSuite {
113113
val conf = newConf(Map(
114114
PROXY_HOST -> "1.2.3.4",
115115
PROXY_PORT -> "8080",
116-
NO_PROXY_HOSTS -> "localhost,127.0.0.1"
116+
NO_PROXY_HOSTS -> "localhost,127.0.0.1",
117+
PROXY_AUTH_TOKEN -> "testAuthToken",
118+
CA_CERT_PATH -> "/path/to/ca_cert.pem",
119+
SSL_TRUST_ALL_CONF -> "true"
117120
))
118121
val proxyConfig = getProxyConfig(conf)
119122
assert(proxyConfig.isDefined)
120123
assert(proxyConfig.get.host == "1.2.3.4")
121124
assert(proxyConfig.get.port == 8080)
122125
assert(proxyConfig.get.noProxyHosts == Seq("localhost", "127.0.0.1"))
126+
assert(proxyConfig.get.authToken.contains("testAuthToken"))
127+
assert(proxyConfig.get.caCertPath.contains("/path/to/ca_cert.pem"))
128+
assert(proxyConfig.get.sslTrustAll)
123129
}
124130

125131
test("getProxyConfig with only host and port") {
@@ -132,6 +138,9 @@ class ConfUtilsSuite extends SparkFunSuite {
132138
assert(proxyConfig.get.host == "1.2.3.4")
133139
assert(proxyConfig.get.port == 8080)
134140
assert(proxyConfig.get.noProxyHosts.isEmpty)
141+
assert(proxyConfig.get.authToken.isEmpty)
142+
assert(proxyConfig.get.caCertPath.isEmpty)
143+
assert(!proxyConfig.get.sslTrustAll)
135144
}
136145

137146
test("getProxyConfig with no proxy settings") {

client/src/test/scala/io/delta/sharing/client/util/RetryUtilsSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import scala.collection.mutable.ArrayBuffer
2222

2323
import org.apache.spark.SparkFunSuite
2424

25-
import io.delta.sharing.client.util.{RetryUtils, UnexpectedHttpStatus}
2625
import io.delta.sharing.client.util.RetryUtils._
2726
import io.delta.sharing.spark.MissingEndStreamActionException
2827

0 commit comments

Comments
 (0)