diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala index fd02c5e11..974810bc3 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala @@ -49,7 +49,7 @@ private[sharing] class DeltaSharingDataSource val options = new DeltaSharingOptions(parameters) val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException) - val deltaLog = RemoteDeltaLog(path) + val deltaLog = RemoteDeltaLog(path, options.shareCredentialsOptions) deltaLog.createRelation(options.versionAsOf, options.timestampAsOf, options.cdfOptions) } @@ -68,7 +68,7 @@ private[sharing] class DeltaSharingDataSource } val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException) - val deltaLog = RemoteDeltaLog(path, forStreaming = true) + val deltaLog = RemoteDeltaLog(path, options.shareCredentialsOptions, forStreaming = true) val schemaToUse = deltaLog.snapshot().schema if (schemaToUse.isEmpty) { throw DeltaSharingErrors.schemaNotSetException @@ -93,7 +93,7 @@ private[sharing] class DeltaSharingDataSource } val options = new DeltaSharingOptions(parameters) val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException) - val deltaLog = RemoteDeltaLog(path, forStreaming = true) + val deltaLog = RemoteDeltaLog(path, options.shareCredentialsOptions, forStreaming = true) DeltaSharingSource(SparkSession.active, deltaLog, options) } diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingOptions.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingOptions.scala index 562e3bf30..4df82642f 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingOptions.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingOptions.scala @@ -95,6 +95,8 @@ trait DeltaSharingReadOptions extends DeltaSharingOptionParser { val timestampAsOf = options.get(TIME_TRAVEL_TIMESTAMP).map(getFormattedTimestamp(_)) + val shareCredentialsOptions: Map[String, String] = prepareShareCredentialsOptions() + def isTimeTravel: Boolean = versionAsOf.isDefined || timestampAsOf.isDefined // Parse the input timestamp string and TimestampType, and generate a formatted timestamp string @@ -124,6 +126,21 @@ trait DeltaSharingReadOptions extends DeltaSharingOptionParser { } } + private def prepareShareCredentialsOptions(): Map[String, String] = { + validShareCredentialsOptions.filter { option => + options.contains(option._1) + }.map { option => + val key = option._1 + val value = key match { + case PROFILE_EXPIRATION_TIME => + getFormattedTimestamp(options.get(key).get) + case _ => + options.get(key).get + } + key -> value + } + } + private def validateOneStartingOption(): Unit = { if (startingTimestamp.isDefined && startingVersion.isDefined) { throw DeltaSharingErrors.versionAndTimestampBothSetException( @@ -182,6 +199,11 @@ object DeltaSharingOptions extends Logging { val TIME_TRAVEL_VERSION = "versionAsOf" val TIME_TRAVEL_TIMESTAMP = "timestampAsOf" + val PROFILE_SHARE_CREDENTIALS_VERSION = "shareCredentialsVersion" + val PROFILE_ENDPOINT = "endpoint" + val PROFILE_BEARER_TOKEN = "bearerToken" + val PROFILE_EXPIRATION_TIME = "expirationTime" + val validCdfOptions = Map( CDF_READ_OPTION -> "", CDF_READ_OPTION_LEGACY -> "", @@ -190,6 +212,13 @@ object DeltaSharingOptions extends Logging { CDF_START_VERSION -> "", CDF_END_VERSION -> "" ) + + val validShareCredentialsOptions = Map( + PROFILE_SHARE_CREDENTIALS_VERSION -> "", + PROFILE_ENDPOINT -> "", + PROFILE_BEARER_TOKEN -> "", + PROFILE_EXPIRATION_TIME -> "" + ) } /** diff --git a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingProfileProvider.scala b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingProfileProvider.scala index 360512d6c..f49f2e3e9 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingProfileProvider.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/DeltaSharingProfileProvider.scala @@ -54,6 +54,34 @@ trait DeltaSharingProfileProvider { refresher: Option[String] => TableRefreshResult): Option[String] => TableRefreshResult = { refresher } + + /** + * Validate that a DeltaSharingProfile has all required fields and valid values. + * + * @param profile The profile to validate + * @throws IllegalArgumentException if validation fails + */ + def validate(profile: DeltaSharingProfile): Unit = { + if (profile.shareCredentialsVersion.isEmpty) { + throw new IllegalArgumentException( + "Cannot find the 'shareCredentialsVersion' field in the profile") + } + + if (profile.shareCredentialsVersion.get > DeltaSharingProfile.CURRENT) { + throw new IllegalArgumentException( + s"'shareCredentialsVersion' in the profile is " + + s"${profile.shareCredentialsVersion.get} which is too new. The current release " + + s"supports version ${DeltaSharingProfile.CURRENT} and below. Please upgrade to a newer " + + s"release.") + } + if (profile.endpoint == null) { + throw new IllegalArgumentException("Cannot find the 'endpoint' field in the profile") + } + if (profile.bearerToken == null) { + throw new IllegalArgumentException("Cannot find the 'bearerToken' field in the profile") + } + + } } /** @@ -71,24 +99,30 @@ private[sharing] class DeltaSharingFileProfileProvider( } finally { input.close() } - if (profile.shareCredentialsVersion.isEmpty) { - throw new IllegalArgumentException( - "Cannot find the 'shareCredentialsVersion' field in the profile file") - } + validate(profile) + profile + } - if (profile.shareCredentialsVersion.get > DeltaSharingProfile.CURRENT) { - throw new IllegalArgumentException( - s"'shareCredentialsVersion' in the profile is " + - s"${profile.shareCredentialsVersion.get} which is too new. The current release " + - s"supports version ${DeltaSharingProfile.CURRENT} and below. Please upgrade to a newer " + - s"release.") - } - if (profile.endpoint == null) { - throw new IllegalArgumentException("Cannot find the 'endpoint' field in the profile file") + override def getProfile: DeltaSharingProfile = profile +} + +/** + * Load [[DeltaSharingProfile]] from options. + */ +private[sharing] class DeltaSharingOptionsProfileProvider( + shareCredentialsOptions: Map[String, String]) extends DeltaSharingProfileProvider { + + val profile = { + // Convert string representations of shareCredentialsVersion to Int + val normalizedOptions: Map[String, Any] = shareCredentialsOptions.map { + case (DeltaSharingOptions.PROFILE_SHARE_CREDENTIALS_VERSION, v) => + DeltaSharingOptions.PROFILE_SHARE_CREDENTIALS_VERSION -> v.toInt + case (k, v) => k -> v } - if (profile.bearerToken == null) { - throw new IllegalArgumentException("Cannot find the 'bearerToken' field in the profile file") + val profile = { + JsonUtils.fromJson[DeltaSharingProfile](JsonUtils.toJson(normalizedOptions)) } + validate(profile) profile } diff --git a/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala b/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala index 6d58afb4b..f5fd4823f 100644 --- a/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala +++ b/spark/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala @@ -110,20 +110,31 @@ private[sharing] object RemoteDeltaLog { } /** - * Parse the user provided path `profile_file#share.schema.share` to - * `(profile_file, share, schema, share)`. + * Validate and parse user provided path: + * When credentials are provided via options, path format should be `share.schema.table` + * Otherwise, profile file should be included `profile_file#share.schema.table` */ - def parsePath(path: String): (String, String, String, String) = { + def parsePath(path: String, + shareCredentialsOptions: Map[String, String]): (String, String, String, String) = { val shapeIndex = path.lastIndexOf('#') - if (shapeIndex < 0) { - throw new IllegalArgumentException(s"path $path is not valid") + val (profileFile, tablePath) = { + if (shareCredentialsOptions.nonEmpty && shapeIndex < 0) { + ("", path) + } + else if (shareCredentialsOptions.nonEmpty && shapeIndex >= 0) { + throw new IllegalArgumentException( + "cannot specify both share credentials options and a profile file path") + } else if (shareCredentialsOptions.isEmpty && shapeIndex < 0) { + throw new IllegalArgumentException(s"path $path is not valid") + } else { + (path.substring(0, shapeIndex), path.substring(shapeIndex + 1)) + } } - val profileFile = path.substring(0, shapeIndex) - val tableSplits = path.substring(shapeIndex + 1).split("\\.", -1) + val tableSplits = tablePath.split("\\.", -1) if (tableSplits.length != 3) { throw new IllegalArgumentException(s"path $path is not valid") } - if (profileFile.isEmpty || tableSplits(0).isEmpty || + if ((profileFile.isEmpty && shareCredentialsOptions.isEmpty) || tableSplits(0).isEmpty || tableSplits(1).isEmpty || tableSplits(2).isEmpty) { throw new IllegalArgumentException(s"path $path is not valid") } @@ -142,20 +153,24 @@ private[sharing] object RemoteDeltaLog { s"_${formattedDateTime}_${uuid}" } - def apply(path: String, forStreaming: Boolean = false): RemoteDeltaLog = { + def apply(path: String, + shareCredentialsOptions: Map[String, String], + forStreaming: Boolean = false): RemoteDeltaLog = { val sqlConf = SparkSession.active.sessionState.conf - val (profileFile, share, schema, table) = parsePath(path) - - val profileProviderclass = - sqlConf.getConfString("spark.delta.sharing.profile.provider.class", - "io.delta.sharing.spark.DeltaSharingFileProfileProvider") + val (profileFile, share, schema, table) = parsePath(path, shareCredentialsOptions) - val profileProvider: DeltaSharingProfileProvider = + val profileProvider: DeltaSharingProfileProvider = if (shareCredentialsOptions.nonEmpty) { + new DeltaSharingOptionsProfileProvider(shareCredentialsOptions) + } else { + val profileProviderclass = + sqlConf.getConfString("spark.delta.sharing.profile.provider.class", + "io.delta.sharing.spark.DeltaSharingFileProfileProvider") Class.forName(profileProviderclass) .getConstructor(classOf[Configuration], classOf[String]) .newInstance(SparkSession.active.sessionState.newHadoopConf(), profileFile) .asInstanceOf[DeltaSharingProfileProvider] + } val deltaSharingTable = DeltaSharingTable(name = table, schema = schema, share = share) // This is a flag to test the local https server. Should never be used in production. diff --git a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingFileProfileProviderSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingFileProfileProviderSuite.scala index e94c6a7f5..07cca586a 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingFileProfileProviderSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingFileProfileProviderSuite.scala @@ -82,7 +82,7 @@ class DeltaSharingFileProfileProviderSuite extends SparkFunSuite { ) } assert(e.getMessage.contains( - "Cannot find the 'shareCredentialsVersion' field in the profile file")) + "Cannot find the 'shareCredentialsVersion' field in the profile")) } test("shareCredentialsVersion is not supported") { @@ -110,7 +110,7 @@ class DeltaSharingFileProfileProviderSuite extends SparkFunSuite { null ) } - assert(e.getMessage.contains("Cannot find the 'endpoint' field in the profile file")) + assert(e.getMessage.contains("Cannot find the 'endpoint' field in the profile")) } test("bearerToken is missing") { @@ -124,7 +124,7 @@ class DeltaSharingFileProfileProviderSuite extends SparkFunSuite { null ) } - assert(e.getMessage.contains("Cannot find the 'bearerToken' field in the profile file")) + assert(e.getMessage.contains("Cannot find the 'bearerToken' field in the profile")) } test("unknown field should be ignored") { diff --git a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingOptionsProfileProviderSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingOptionsProfileProviderSuite.scala new file mode 100644 index 000000000..62c57244b --- /dev/null +++ b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingOptionsProfileProviderSuite.scala @@ -0,0 +1,149 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.spark + +import org.apache.spark.SparkFunSuite + + +class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite { + + private def testProfile( + shareCredentialsOptions: Map[String, String], expected: DeltaSharingProfile): Unit = { + assert(new DeltaSharingOptionsProfileProvider(shareCredentialsOptions) + .getProfile == expected) + } + + test("parse") { + testProfile( + Map( + "shareCredentialsVersion" -> "1", + "endpoint" -> "foo", + "bearerToken" -> "bar", + "expirationTime" -> "2021-11-12T00:12:29Z" + ), + DeltaSharingProfile( + shareCredentialsVersion = Some(1), + endpoint = "foo", + bearerToken = "bar", + expirationTime = "2021-11-12T00:12:29Z" + ) + ) + } + + test("expirationTime is optional") { + testProfile( + Map( + "shareCredentialsVersion" -> "1", + "endpoint" -> "foo", + "bearerToken" -> "bar" + ), + DeltaSharingProfile( + shareCredentialsVersion = Some(1), + endpoint = "foo", + bearerToken = "bar" + ) + ) + } + + test("shareCredentialsVersion is missing") { + val e = intercept[IllegalArgumentException] { + testProfile( + Map( + "endpoint" -> "foo", + "bearerToken" -> "bar" + ), + null + ) + } + assert(e.getMessage.contains( + "Cannot find the 'shareCredentialsVersion' field in the profile")) + } + + test("shareCredentialsVersion is incorrect") { + val e = intercept[IllegalArgumentException] { + testProfile( + Map( + "shareCredentialsVersion" -> "2", + "endpoint" -> "foo", + "bearerToken" -> "bar" + ), + null + ) + } + assert(e.getMessage.contains( + "'shareCredentialsVersion' in the profile is 2 which is too new. " + + "The current release supports version 1 and below. Please upgrade to a newer release.")) + } + + test("shareCredentialsVersion is not supported") { + val e = intercept[IllegalArgumentException] { + testProfile( + Map( + "shareCredentialsVersion" -> "100" + ), + null + ) + } + assert(e.getMessage.contains( + "'shareCredentialsVersion' in the profile is 100 which is too new.")) + } + + test("endpoint is missing") { + val e = intercept[IllegalArgumentException] { + testProfile( + Map( + "shareCredentialsVersion" -> "1", + "bearerToken" -> "bar" + ), + null + ) + } + assert(e.getMessage.contains("Cannot find the 'endpoint' field in the profile")) + } + + test("bearerToken is missing") { + val e = intercept[IllegalArgumentException] { + testProfile( + Map( + "shareCredentialsVersion" -> "1", + "endpoint" -> "foo" + ), + null + ) + } + assert(e.getMessage.contains("Cannot find the 'bearerToken' field in the profile")) + } + + test("unknown field should be ignored") { + testProfile( + Map( + "shareCredentialsVersion" -> "1", + "endpoint" -> "foo", + "bearerToken" -> "bar", + "expirationTime" -> "2021-11-12T00:12:29Z", + "futureField" -> "xyz" + ), + DeltaSharingProfile( + shareCredentialsVersion = Some(1), + endpoint = "foo", + bearerToken = "bar", + expirationTime = "2021-11-12T00:12:29Z" + ) + ) + } + +} diff --git a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingOptionsSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingOptionsSuite.scala index e09e0ed53..da12fb68e 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingOptionsSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingOptionsSuite.scala @@ -83,6 +83,26 @@ class DeltaSharingOptionsSuite extends SparkFunSuite { assert(options.options.get("notreservedoption") == Some("random")) } + test("Parse shareCredentials map successfully") { + // profile as opts + var options = new DeltaSharingOptions(Map( + "shareCredentialsVersion" -> "1", + "endpoint" -> "foo", + "bearerToken" -> "bar", + "expirationTime" -> "2022-01-01T00:00:00-02:00" + )) + + assert(options.shareCredentialsOptions.get( + DeltaSharingOptions.PROFILE_SHARE_CREDENTIALS_VERSION) == Some("1")) + assert(options.shareCredentialsOptions.get( + DeltaSharingOptions.PROFILE_ENDPOINT) == Some("foo")) + assert(options.shareCredentialsOptions.get( + DeltaSharingOptions.PROFILE_BEARER_TOKEN) == Some("bar")) + assert(options.shareCredentialsOptions.get( + DeltaSharingOptions.PROFILE_EXPIRATION_TIME) == Some("2022-01-01T02:00:00Z")) + + } + test("Parse cdfOptions map successfully") { var options = new DeltaSharingOptions(Map( "readChangeFeed" -> "true", diff --git a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceCDFSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceCDFSuite.scala index 9b39ee131..284865803 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceCDFSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceCDFSuite.scala @@ -65,8 +65,9 @@ class DeltaSharingSourceCDFSuite extends QueryTest // VERSION 3: UPDATE 4 rows, 4 cdf files, 8 cdf rows // VERSION 4: REMOVE 4 rows, 2 remove files lazy val cdfTablePath = testProfileFile.getCanonicalPath + "#share8.default.streaming_cdf_table" + lazy val shareCredentialsOptions: Map[String, String] = Map.empty - lazy val deltaLog = RemoteDeltaLog(cdfTablePath, forStreaming = true) + lazy val deltaLog = RemoteDeltaLog(cdfTablePath, shareCredentialsOptions, forStreaming = true) def getSource(parameters: Map[String, String]): DeltaSharingSource = { val options = new DeltaSharingOptions(parameters ++ Map("readChangeFeed" -> "true")) diff --git a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceSuite.scala index ed121bccc..6ebe457a7 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSourceSuite.scala @@ -66,8 +66,9 @@ class DeltaSharingSourceSuite extends QueryTest "#share8.default.streaming_notnull_to_null" lazy val toNotNullTable = testProfileFile.getCanonicalPath + "#share8.default.streaming_null_to_notnull" + lazy val shareCredentialsOptions: Map[String, String] = Map.empty - lazy val deltaLog = RemoteDeltaLog(tablePath, forStreaming = true) + lazy val deltaLog = RemoteDeltaLog(tablePath, shareCredentialsOptions, forStreaming = true) def getSource(parameters: Map[String, String]): DeltaSharingSource = { val options = new DeltaSharingOptions(parameters) diff --git a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSuite.scala index 53aaa8bc7..15cdf8758 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/DeltaSharingSuite.scala @@ -33,6 +33,20 @@ class DeltaSharingSuite extends QueryTest with SharedSparkSession with DeltaShar import testImplicits._ + integrationTest("table1 passing profile with read options") { + val tablePath = "share1.default.table1" + val expected = Seq( + Row(sqlTimestamp("2021-04-27 23:32:02.07"), sqlDate("2021-04-28")), + Row(sqlTimestamp("2021-04-27 23:32:22.421"), sqlDate("2021-04-28")) + ) + val readOptions = Map( + "endpoint" -> s"https://localhost:$TEST_PORT/delta-sharing", + "bearerToken" -> "dapi5e3574ec767ca1548ae5bbed1a2dc04d", + "shareCredentialsVersion" -> "1" + ) + checkAnswer(spark.read.format("deltaSharing").options(readOptions).load(tablePath), expected) + } + integrationTest("table1") { val tablePath = testProfileFile.getCanonicalPath + "#share1.default.table1" val expected = Seq( diff --git a/spark/src/test/scala/io/delta/sharing/spark/RemoteDeltaLogSuite.scala b/spark/src/test/scala/io/delta/sharing/spark/RemoteDeltaLogSuite.scala index 3956ee55c..b1e04307c 100644 --- a/spark/src/test/scala/io/delta/sharing/spark/RemoteDeltaLogSuite.scala +++ b/spark/src/test/scala/io/delta/sharing/spark/RemoteDeltaLogSuite.scala @@ -39,28 +39,57 @@ import io.delta.sharing.spark.util.JsonUtils class RemoteDeltaLogSuite extends SparkFunSuite with SharedSparkSession { test("parsePath") { - assert(RemoteDeltaLog.parsePath("file:///foo/bar#a.b.c") == ("file:///foo/bar", "a", "b", "c")) - assert(RemoteDeltaLog.parsePath("file:///foo/bar#bar#a.b.c") == + lazy val emptyShareCredentialsOptions: Map[String, String] = Map.empty + assert(RemoteDeltaLog.parsePath("file:///foo/bar#a.b.c", emptyShareCredentialsOptions) == + ("file:///foo/bar", "a", "b", "c")) + assert(RemoteDeltaLog.parsePath("file:///foo/bar#bar#a.b.c", emptyShareCredentialsOptions) == ("file:///foo/bar#bar", "a", "b", "c")) - assert(RemoteDeltaLog.parsePath("file:///foo/bar#bar#a.b.c ") == + assert(RemoteDeltaLog.parsePath("file:///foo/bar#bar#a.b.c ", emptyShareCredentialsOptions) == ("file:///foo/bar#bar", "a", "b", "c ")) intercept[IllegalArgumentException] { - RemoteDeltaLog.parsePath("file:///foo/bar") + RemoteDeltaLog.parsePath("file:///foo/bar", emptyShareCredentialsOptions) } intercept[IllegalArgumentException] { - RemoteDeltaLog.parsePath("file:///foo/bar#a.b") + RemoteDeltaLog.parsePath("file:///foo/bar#a.b", emptyShareCredentialsOptions) } intercept[IllegalArgumentException] { - RemoteDeltaLog.parsePath("file:///foo/bar#a.b.c.d") + RemoteDeltaLog.parsePath("file:///foo/bar#a.b.c.d", emptyShareCredentialsOptions) } intercept[IllegalArgumentException] { - RemoteDeltaLog.parsePath("#a.b.c") + RemoteDeltaLog.parsePath("#a.b.c", emptyShareCredentialsOptions) } intercept[IllegalArgumentException] { - RemoteDeltaLog.parsePath("foo#a.b.") + RemoteDeltaLog.parsePath("foo#a.b.", emptyShareCredentialsOptions) } intercept[IllegalArgumentException] { - RemoteDeltaLog.parsePath("foo#a.b.c.") + RemoteDeltaLog.parsePath("foo#a.b.c.", emptyShareCredentialsOptions) + } + } + + test("parsePath with options") { + lazy val shareCredentialsOptions: Map[String, String] = Map("key" -> "value") + assert(RemoteDeltaLog.parsePath("a.b.c", shareCredentialsOptions) == + ("", "a", "b", "c")) + intercept[IllegalArgumentException] { + RemoteDeltaLog.parsePath("file:///foo/bar#a.b.c.d", shareCredentialsOptions) + } + intercept[IllegalArgumentException] { + RemoteDeltaLog.parsePath("#a.b.c", shareCredentialsOptions) + } + intercept[IllegalArgumentException] { + RemoteDeltaLog.parsePath("", shareCredentialsOptions) + } + intercept[IllegalArgumentException] { + RemoteDeltaLog.parsePath("a.b", shareCredentialsOptions) + } + intercept[IllegalArgumentException] { + RemoteDeltaLog.parsePath("a.b.c.d", shareCredentialsOptions) + } + intercept[IllegalArgumentException] { + RemoteDeltaLog.parsePath("a.b.", shareCredentialsOptions) + } + intercept[IllegalArgumentException] { + RemoteDeltaLog.parsePath("a.b.c.", shareCredentialsOptions) } } @@ -450,4 +479,45 @@ class RemoteDeltaLogSuite extends SparkFunSuite with SharedSparkSession { assert(TestDeltaSharingClient.limits === Seq(2L, 3L)) } } + + test("RemoteDeltaLog path") { + // Create a dummy table path + val testProfileFile = Files.createTempFile("delta-test", ".share").toFile + FileUtils.writeStringToFile(testProfileFile, + s"""{ + | "shareCredentialsVersion": 1, + | "endpoint": "http://localhost:12345/delta-sharing", + | "bearerToken": "xxxxx" + |}""".stripMargin, UTF_8) + val tablePath = s"${testProfileFile.getCanonicalPath}#share.schema.table" + lazy val shareCredentialsOptions: Map[String, String] = Map.empty + + spark.sessionState.conf.setConfString( + "spark.delta.sharing.client.sparkParquetIOCache.enabled", "false") + // Append timestamp suffix + // #share.schema.table_yyyyMMdd_HHmmss_uuid + val deltaLog2 = RemoteDeltaLog(tablePath, shareCredentialsOptions) + assert(deltaLog2.path.toString.split("#")(1).split("_").length == 4) + val snapshot2 = deltaLog2.snapshot() + assert(snapshot2.getTablePath.toString.split("#")(1).split("_").length == 4) + } + + test("RemoteDeltaLog path with options") { + lazy val tablePath = "share.schema.table" + lazy val shareCredentialsOptions: Map[String, String] = Map( + "shareCredentialsVersion" -> "1", + "endpoint" -> "foo", + "bearerToken" -> "bar", + "expirationTime" -> "2021-11-12T00:12:29Z" + ) + + spark.sessionState.conf.setConfString( + "spark.delta.sharing.client.sparkParquetIOCache.enabled", "false") + // Append timestamp suffix + // share.schema.table_yyyyMMdd_HHmmss_uuid + val deltaLog2 = RemoteDeltaLog(tablePath, shareCredentialsOptions) + assert(deltaLog2.path.toString.split("\\.")(2).split("_").length == 4) + val snapshot2 = deltaLog2.snapshot() + assert(snapshot2.getTablePath.toString.split("\\.")(2).split("_").length == 4) + } }