Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[sharing] class DeltaSharingDataSource
val options = new DeltaSharingOptions(parameters)
val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException)

val deltaLog = RemoteDeltaLog(path)
val deltaLog = RemoteDeltaLog(path, options.shareCredentialsOptions)
deltaLog.createRelation(options.versionAsOf, options.timestampAsOf, options.cdfOptions)
}

Expand All @@ -68,7 +68,7 @@ private[sharing] class DeltaSharingDataSource
}

val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException)
val deltaLog = RemoteDeltaLog(path, forStreaming = true)
val deltaLog = RemoteDeltaLog(path, options.shareCredentialsOptions, forStreaming = true)
val schemaToUse = deltaLog.snapshot().schema
if (schemaToUse.isEmpty) {
throw DeltaSharingErrors.schemaNotSetException
Expand All @@ -93,7 +93,7 @@ private[sharing] class DeltaSharingDataSource
}
val options = new DeltaSharingOptions(parameters)
val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException)
val deltaLog = RemoteDeltaLog(path, forStreaming = true)
val deltaLog = RemoteDeltaLog(path, options.shareCredentialsOptions, forStreaming = true)

DeltaSharingSource(SparkSession.active, deltaLog, options)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ trait DeltaSharingReadOptions extends DeltaSharingOptionParser {

val timestampAsOf = options.get(TIME_TRAVEL_TIMESTAMP).map(getFormattedTimestamp(_))

val shareCredentialsOptions: Map[String, String] = prepareShareCredentialsOptions()

def isTimeTravel: Boolean = versionAsOf.isDefined || timestampAsOf.isDefined

// Parse the input timestamp string and TimestampType, and generate a formatted timestamp string
Expand Down Expand Up @@ -124,6 +126,21 @@ trait DeltaSharingReadOptions extends DeltaSharingOptionParser {
}
}

private def prepareShareCredentialsOptions(): Map[String, String] = {
validShareCredentialsOptions.filter { option =>
options.contains(option._1)
}.map { option =>
val key = option._1
val value = key match {
case PROFILE_EXPIRATION_TIME =>
getFormattedTimestamp(options.get(key).get)
case _ =>
options.get(key).get
}
key -> value
}
}

private def validateOneStartingOption(): Unit = {
if (startingTimestamp.isDefined && startingVersion.isDefined) {
throw DeltaSharingErrors.versionAndTimestampBothSetException(
Expand Down Expand Up @@ -182,6 +199,11 @@ object DeltaSharingOptions extends Logging {
val TIME_TRAVEL_VERSION = "versionAsOf"
val TIME_TRAVEL_TIMESTAMP = "timestampAsOf"

val PROFILE_SHARE_CREDENTIALS_VERSION = "shareCredentialsVersion"
val PROFILE_ENDPOINT = "endpoint"
val PROFILE_BEARER_TOKEN = "bearerToken"
val PROFILE_EXPIRATION_TIME = "expirationTime"

val validCdfOptions = Map(
CDF_READ_OPTION -> "",
CDF_READ_OPTION_LEGACY -> "",
Expand All @@ -190,6 +212,13 @@ object DeltaSharingOptions extends Logging {
CDF_START_VERSION -> "",
CDF_END_VERSION -> ""
)

val validShareCredentialsOptions = Map(
PROFILE_SHARE_CREDENTIALS_VERSION -> "",
PROFILE_ENDPOINT -> "",
PROFILE_BEARER_TOKEN -> "",
PROFILE_EXPIRATION_TIME -> ""
)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,34 @@ trait DeltaSharingProfileProvider {
refresher: Option[String] => TableRefreshResult): Option[String] => TableRefreshResult = {
refresher
}

/**
* Validate that a DeltaSharingProfile has all required fields and valid values.
*
* @param profile The profile to validate
* @throws IllegalArgumentException if validation fails
*/
def validate(profile: DeltaSharingProfile): Unit = {
if (profile.shareCredentialsVersion.isEmpty) {
throw new IllegalArgumentException(
"Cannot find the 'shareCredentialsVersion' field in the profile")
}

if (profile.shareCredentialsVersion.get > DeltaSharingProfile.CURRENT) {
throw new IllegalArgumentException(
s"'shareCredentialsVersion' in the profile is " +
s"${profile.shareCredentialsVersion.get} which is too new. The current release " +
s"supports version ${DeltaSharingProfile.CURRENT} and below. Please upgrade to a newer " +
s"release.")
}
if (profile.endpoint == null) {
throw new IllegalArgumentException("Cannot find the 'endpoint' field in the profile")
}
if (profile.bearerToken == null) {
throw new IllegalArgumentException("Cannot find the 'bearerToken' field in the profile")
}

}
}

/**
Expand All @@ -71,24 +99,30 @@ private[sharing] class DeltaSharingFileProfileProvider(
} finally {
input.close()
}
if (profile.shareCredentialsVersion.isEmpty) {
throw new IllegalArgumentException(
"Cannot find the 'shareCredentialsVersion' field in the profile file")
}
validate(profile)
profile
}

if (profile.shareCredentialsVersion.get > DeltaSharingProfile.CURRENT) {
throw new IllegalArgumentException(
s"'shareCredentialsVersion' in the profile is " +
s"${profile.shareCredentialsVersion.get} which is too new. The current release " +
s"supports version ${DeltaSharingProfile.CURRENT} and below. Please upgrade to a newer " +
s"release.")
}
if (profile.endpoint == null) {
throw new IllegalArgumentException("Cannot find the 'endpoint' field in the profile file")
override def getProfile: DeltaSharingProfile = profile
}

/**
* Load [[DeltaSharingProfile]] from options.
*/
private[sharing] class DeltaSharingOptionsProfileProvider(
shareCredentialsOptions: Map[String, String]) extends DeltaSharingProfileProvider {

val profile = {
// Convert string representations of shareCredentialsVersion to Int
val normalizedOptions: Map[String, Any] = shareCredentialsOptions.map {
case (DeltaSharingOptions.PROFILE_SHARE_CREDENTIALS_VERSION, v) =>
DeltaSharingOptions.PROFILE_SHARE_CREDENTIALS_VERSION -> v.toInt
case (k, v) => k -> v
}
if (profile.bearerToken == null) {
throw new IllegalArgumentException("Cannot find the 'bearerToken' field in the profile file")
val profile = {
JsonUtils.fromJson[DeltaSharingProfile](JsonUtils.toJson(normalizedOptions))
}
validate(profile)
profile
}

Expand Down
45 changes: 30 additions & 15 deletions spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,31 @@ private[sharing] object RemoteDeltaLog {
}

/**
* Parse the user provided path `profile_file#share.schema.share` to
* `(profile_file, share, schema, share)`.
* Validate and parse user provided path:
* When credentials are provided via options, path format should be `share.schema.table`
* Otherwise, profile file should be included `profile_file#share.schema.table`
*/
def parsePath(path: String): (String, String, String, String) = {
def parsePath(path: String,
shareCredentialsOptions: Map[String, String]): (String, String, String, String) = {
val shapeIndex = path.lastIndexOf('#')
if (shapeIndex < 0) {
throw new IllegalArgumentException(s"path $path is not valid")
val (profileFile, tablePath) = {
if (shareCredentialsOptions.nonEmpty && shapeIndex < 0) {
("", path)
}
else if (shareCredentialsOptions.nonEmpty && shapeIndex >= 0) {
throw new IllegalArgumentException(
"cannot specify both share credentials options and a profile file path")
} else if (shareCredentialsOptions.isEmpty && shapeIndex < 0) {
throw new IllegalArgumentException(s"path $path is not valid")
} else {
(path.substring(0, shapeIndex), path.substring(shapeIndex + 1))
}
}
val profileFile = path.substring(0, shapeIndex)
val tableSplits = path.substring(shapeIndex + 1).split("\\.", -1)
val tableSplits = tablePath.split("\\.", -1)
if (tableSplits.length != 3) {
throw new IllegalArgumentException(s"path $path is not valid")
}
if (profileFile.isEmpty || tableSplits(0).isEmpty ||
if ((profileFile.isEmpty && shareCredentialsOptions.isEmpty) || tableSplits(0).isEmpty ||
tableSplits(1).isEmpty || tableSplits(2).isEmpty) {
throw new IllegalArgumentException(s"path $path is not valid")
}
Expand All @@ -142,20 +153,24 @@ private[sharing] object RemoteDeltaLog {
s"_${formattedDateTime}_${uuid}"
}

def apply(path: String, forStreaming: Boolean = false): RemoteDeltaLog = {
def apply(path: String,
shareCredentialsOptions: Map[String, String],
forStreaming: Boolean = false): RemoteDeltaLog = {
val sqlConf = SparkSession.active.sessionState.conf
val (profileFile, share, schema, table) = parsePath(path)

val profileProviderclass =
sqlConf.getConfString("spark.delta.sharing.profile.provider.class",
"io.delta.sharing.spark.DeltaSharingFileProfileProvider")
val (profileFile, share, schema, table) = parsePath(path, shareCredentialsOptions)

val profileProvider: DeltaSharingProfileProvider =
val profileProvider: DeltaSharingProfileProvider = if (shareCredentialsOptions.nonEmpty) {
new DeltaSharingOptionsProfileProvider(shareCredentialsOptions)
} else {
val profileProviderclass =
sqlConf.getConfString("spark.delta.sharing.profile.provider.class",
"io.delta.sharing.spark.DeltaSharingFileProfileProvider")
Class.forName(profileProviderclass)
.getConstructor(classOf[Configuration], classOf[String])
.newInstance(SparkSession.active.sessionState.newHadoopConf(),
profileFile)
.asInstanceOf[DeltaSharingProfileProvider]
}

val deltaSharingTable = DeltaSharingTable(name = table, schema = schema, share = share)
// This is a flag to test the local https server. Should never be used in production.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class DeltaSharingFileProfileProviderSuite extends SparkFunSuite {
)
}
assert(e.getMessage.contains(
"Cannot find the 'shareCredentialsVersion' field in the profile file"))
"Cannot find the 'shareCredentialsVersion' field in the profile"))
}

test("shareCredentialsVersion is not supported") {
Expand Down Expand Up @@ -110,7 +110,7 @@ class DeltaSharingFileProfileProviderSuite extends SparkFunSuite {
null
)
}
assert(e.getMessage.contains("Cannot find the 'endpoint' field in the profile file"))
assert(e.getMessage.contains("Cannot find the 'endpoint' field in the profile"))
}

test("bearerToken is missing") {
Expand All @@ -124,7 +124,7 @@ class DeltaSharingFileProfileProviderSuite extends SparkFunSuite {
null
)
}
assert(e.getMessage.contains("Cannot find the 'bearerToken' field in the profile file"))
assert(e.getMessage.contains("Cannot find the 'bearerToken' field in the profile"))
}

test("unknown field should be ignored") {
Expand Down
Loading