diff --git a/build.sbt b/build.sbt index d565420a572..97b864d3fb7 100644 --- a/build.sbt +++ b/build.sbt @@ -1105,6 +1105,8 @@ lazy val testDeltaIcebergJar = (project in file("testDeltaIcebergJar")) val deltaIcebergSparkIncludePrefixes = Seq( // We want everything from this package "org/apache/spark/sql/delta/icebergShaded", + // Server-side planning support + "org/apache/spark/sql/delta/serverSidePlanning", // We only want the files in this project from this package. e.g. we want to exclude // org/apache/spark/sql/delta/commands/convert/ConvertTargetFile.class (from delta-spark project). diff --git a/iceberg/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClientFactoryAutoRegistrationSuite.scala b/iceberg/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClientFactoryAutoRegistrationSuite.scala new file mode 100644 index 00000000000..50f5ecd6c85 --- /dev/null +++ b/iceberg/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClientFactoryAutoRegistrationSuite.scala @@ -0,0 +1,114 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.serverSidePlanning + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Tests for ServerSidePlanningClientFactory auto-registration with + * IcebergRESTCatalogPlanningClientFactory. + * + * These tests verify that the ServiceLoader-based auto-registration mechanism + * correctly discovers and registers the IcebergRESTCatalogPlanningClientFactory + * when it's on the classpath. + */ +class ServerSidePlanningClientFactoryAutoRegistrationSuite + extends QueryTest + with SharedSparkSession { + + override def afterEach(): Unit = { + try { + ServerSidePlanningClientFactory.clearFactory() + } finally { + super.afterEach() + } + } + + /** + * Execute test block with clean factory state (setup + teardown). + */ + private def withCleanFactory[T](testFn: => T): T = { + try { + ServerSidePlanningClientFactory.clearFactory() + testFn + } finally { + ServerSidePlanningClientFactory.clearFactory() + } + } + + test("auto-registration succeeds when IcebergRESTCatalogPlanningClientFactory " + + "is on classpath") { + withCleanFactory { + // Verify factory is not registered initially + assert(!ServerSidePlanningClientFactory.isFactoryRegistered(), + "Factory should not be registered initially") + assert(ServerSidePlanningClientFactory.getRegisteredFactoryName().isEmpty, + "Factory info should be empty initially") + + // Calling getFactory() should trigger auto-registration + val factory = ServerSidePlanningClientFactory.getFactory() + + // Verify factory is successfully registered + assert(factory != null, "Factory should not be null after auto-registration") + assert(ServerSidePlanningClientFactory.isFactoryRegistered(), + "Factory should be registered after getFactory() call") + + // Verify it's the correct type + val factoryInfo = ServerSidePlanningClientFactory.getRegisteredFactoryName() + assert(factoryInfo.isDefined, + "Factory info should be defined after auto-registration") + assert(factoryInfo.get.contains("IcebergRESTCatalogPlanningClientFactory"), + s"Expected IcebergRESTCatalogPlanningClientFactory, got: ${factoryInfo.get}") + } + } + + test("autoRegistrationAttempted flag prevents multiple registration attempts") { + withCleanFactory { + // First call triggers auto-registration + val factory1 = ServerSidePlanningClientFactory.getFactory() + val factoryInfo1 = ServerSidePlanningClientFactory.getRegisteredFactoryName() + + // Multiple calls should return the same cached instance + val factory2 = ServerSidePlanningClientFactory.getFactory() + val factory3 = ServerSidePlanningClientFactory.getFactory() + + // Verify all calls return the same instance (reference equality) + assert(factory1 eq factory2, + "Second getFactory() call should return cached instance") + assert(factory2 eq factory3, + "Third getFactory() call should return cached instance") + + // Verify factory info remains consistent + assert(ServerSidePlanningClientFactory.getRegisteredFactoryName() == factoryInfo1, + "Factory info should remain consistent across multiple calls") + + // After clearFactory(), should allow fresh registration + ServerSidePlanningClientFactory.clearFactory() + val factory4 = ServerSidePlanningClientFactory.getFactory() + + // Verify new registration occurred + assert(ServerSidePlanningClientFactory.isFactoryRegistered(), + "Factory should be registered after clearFactory() and getFactory()") + assert(ServerSidePlanningClientFactory.getRegisteredFactoryName().isDefined, + "Factory info should be defined after fresh registration") + assert(ServerSidePlanningClientFactory.getRegisteredFactoryName().get + .contains("IcebergRESTCatalogPlanningClientFactory"), + "Fresh registration should register Iceberg factory") + } + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClient.scala index f498589c988..a1f68319ccf 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClient.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClient.scala @@ -70,41 +70,92 @@ private[serverSidePlanning] trait ServerSidePlanningClientFactory { } /** - * Registry for client factories. Can be configured for testing or to provide - * production implementations (e.g., IcebergRESTCatalogPlanningClientFactory). - * - * By default, no factory is registered. Production code should register an appropriate - * factory implementation before attempting to create clients. + * Registry for client factories. Automatically discovers and registers implementations + * using reflection-based auto-discovery on first access to the factory. Manual registration + * using setFactory() is only needed for testing or to override the auto-discovered factory. */ private[serverSidePlanning] object ServerSidePlanningClientFactory { + // Fully qualified class name for auto-registration via reflection + private val ICEBERG_FACTORY_CLASS_NAME = + "org.apache.spark.sql.delta.serverSidePlanning.IcebergRESTCatalogPlanningClientFactory" + @volatile private var registeredFactory: Option[ServerSidePlanningClientFactory] = None + @volatile private var autoRegistrationAttempted: Boolean = false + + // Lazy initialization - only runs when getFactory() is called and no factory is set. + // Uses reflection to load the hardcoded IcebergRESTCatalogPlanningClientFactory class. + private def tryAutoRegisterFactory(): Unit = { + // Double-checked locking pattern to ensure initialization happens only once + if (!autoRegistrationAttempted) { + synchronized { + if (!autoRegistrationAttempted) { + autoRegistrationAttempted = true + + try { + // Use reflection to load the Iceberg factory class + // scalastyle:off classforname + val clazz = Class.forName(ICEBERG_FACTORY_CLASS_NAME) + // scalastyle:on classforname + val factory = clazz.getConstructor().newInstance() + .asInstanceOf[ServerSidePlanningClientFactory] + registeredFactory = Some(factory) + } catch { + case e: Exception => + throw new IllegalStateException( + "Unable to load IcebergRESTCatalogPlanningClientFactory " + + "for server-side planning. Ensure the delta-iceberg JAR is on the " + + "classpath and compatible with this Delta version.", + e) + } + } + } + } + } /** - * Set a factory for production use or testing. + * Set a factory, overriding any auto-registered factory. + * Synchronized to prevent race conditions with auto-registration. */ private[serverSidePlanning] def setFactory(factory: ServerSidePlanningClientFactory): Unit = { - registeredFactory = Some(factory) + synchronized { + registeredFactory = Some(factory) + } } /** * Clear the registered factory. + * Synchronized to ensure atomic reset of both flags. */ private[serverSidePlanning] def clearFactory(): Unit = { - registeredFactory = None + synchronized { + registeredFactory = None + autoRegistrationAttempted = false + } } /** * Get the currently registered factory. - * Throws IllegalStateException if no factory has been registered. + * Throws IllegalStateException if no factory has been registered (either via reflection-based + * auto-discovery or explicit setFactory() call). */ def getFactory(): ServerSidePlanningClientFactory = { + // Try auto-registration if not already attempted and no factory is manually set + if (registeredFactory.isEmpty) { + tryAutoRegisterFactory() + } + registeredFactory.getOrElse { throw new IllegalStateException( "No ServerSidePlanningClientFactory has been registered. " + - "Call ServerSidePlanningClientFactory.setFactory() to register an implementation.") + "Ensure delta-iceberg JAR is on the classpath for auto-registration, " + + "or call ServerSidePlanningClientFactory.setFactory() to register manually.") } } + def isFactoryRegistered(): Boolean = registeredFactory.isDefined + + def getRegisteredFactoryName(): Option[String] = registeredFactory.map(_.getClass.getName) + /** * Convenience method to create a client from metadata using the registered factory. */ diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClientFactorySuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClientFactorySuite.scala new file mode 100644 index 00000000000..5fa25fe9360 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/serverSidePlanning/ServerSidePlanningClientFactorySuite.scala @@ -0,0 +1,135 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.serverSidePlanning + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Unit tests for ServerSidePlanningClientFactory core functionality. + * Tests manual factory registration, state management, and lifecycle. + */ +class ServerSidePlanningClientFactorySuite extends QueryTest with SharedSparkSession { + + override def afterEach(): Unit = { + try { + ServerSidePlanningClientFactory.clearFactory() + } finally { + super.afterEach() + } + } + + // ========== Test Infrastructure ========== + + /** + * Execute test block with clean factory state (setup + teardown). + * Ensures factory is cleared before and after the test. + */ + private def withCleanFactory[T](testFn: => T): T = { + try { + ServerSidePlanningClientFactory.clearFactory() + testFn + } finally { + ServerSidePlanningClientFactory.clearFactory() + } + } + + /** + * Assert that factory is registered and matches expected type. + * Includes descriptive error messages. + */ + private def assertFactoryType( + expectedType: String, + context: String = ""): Unit = { + val prefix = if (context.nonEmpty) s"[$context] " else "" + + assert(ServerSidePlanningClientFactory.isFactoryRegistered(), + s"${prefix}Factory should be registered") + + val factoryInfo = ServerSidePlanningClientFactory.getRegisteredFactoryName() + assert(factoryInfo.isDefined, + s"${prefix}Factory info should be defined") + assert(factoryInfo.get.contains(expectedType), + s"${prefix}Expected factory type=$expectedType, got: ${factoryInfo.get}") + } + + /** + * Assert that factory is NOT registered. + */ + private def assertNoFactory(context: String = ""): Unit = { + val prefix = if (context.nonEmpty) s"[$context] " else "" + + assert(!ServerSidePlanningClientFactory.isFactoryRegistered(), + s"${prefix}Factory should not be registered") + assert(ServerSidePlanningClientFactory.getRegisteredFactoryName().isEmpty, + s"${prefix}Factory info should be empty") + } + + // ========== Tests ========== + + test("manual setFactory can replace existing factory") { + withCleanFactory { + // Set first factory + val firstFactory = new TestServerSidePlanningClientFactory() + ServerSidePlanningClientFactory.setFactory(firstFactory) + assert(ServerSidePlanningClientFactory.getFactory() eq firstFactory, + "Should return first factory") + + // Replace with second factory + val secondFactory = new TestServerSidePlanningClientFactory() + ServerSidePlanningClientFactory.setFactory(secondFactory) + + // Verify replacement + val retrievedFactory = ServerSidePlanningClientFactory.getFactory() + assert(retrievedFactory eq secondFactory, + "getFactory() should return the second factory after replacement") + assert(!(retrievedFactory eq firstFactory), + "Should not return the first factory") + } + } + + test("getFactory returns same instance across multiple calls") { + withCleanFactory { + val testFactory = new TestServerSidePlanningClientFactory() + ServerSidePlanningClientFactory.setFactory(testFactory) + + val factory1 = ServerSidePlanningClientFactory.getFactory() + val factory2 = ServerSidePlanningClientFactory.getFactory() + val factory3 = ServerSidePlanningClientFactory.getFactory() + + assert(factory1 eq factory2, "Second call should return same instance as first") + assert(factory2 eq factory3, "Third call should return same instance as second") + assert(factory1 eq testFactory, "Should return the originally set factory") + } + } + + test("clearFactory resets registration state") { + withCleanFactory { + val testFactory = new TestServerSidePlanningClientFactory() + ServerSidePlanningClientFactory.setFactory(testFactory) + assert(ServerSidePlanningClientFactory.isFactoryRegistered(), + "Factory should be registered") + + // Clear factory + ServerSidePlanningClientFactory.clearFactory() + + // Verify factory is no longer registered + assertNoFactory("after clearFactory") + } + } + +} diff --git a/testDeltaIcebergJar/src/test/scala/JarSuite.scala b/testDeltaIcebergJar/src/test/scala/JarSuite.scala index 8a26cc02674..48394dca87f 100644 --- a/testDeltaIcebergJar/src/test/scala/JarSuite.scala +++ b/testDeltaIcebergJar/src/test/scala/JarSuite.scala @@ -32,6 +32,8 @@ class JarSuite extends AnyFunSuite { "scala/", // e.g. org/apache/spark/sql/delta/icebergShaded/IcebergTransactionUtils.class "org/apache/spark/sql/delta/icebergShaded/", + // Server-side planning support + "org/apache/spark/sql/delta/serverSidePlanning/", // We explicitly include all the /delta/commands/convert classes we want, to ensure we don't // accidentally pull in some from delta-spark package. "org/apache/spark/sql/delta/commands/convert/IcebergFileManifest",