-
Notifications
You must be signed in to change notification settings - Fork 2k
Add reflection-based auto-registration for ServerSidePlanningClientFactory #5907
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
cf9c88b
d8df656
1ecfc3b
2371d01
7758d5b
fcf1333
2f1ddf2
c6ccf01
b9fedac
2f60c19
c81fbc5
27e2588
1385732
005a9db
b984757
fda4644
9599b17
db2c72f
e26657c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| /* | ||
| * Copyright (2025) The Delta Lake Project Authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.delta.serverSidePlanning | ||
|
|
||
| import org.apache.spark.sql.QueryTest | ||
| import org.apache.spark.sql.test.SharedSparkSession | ||
|
|
||
| /** | ||
| * Tests for ServerSidePlanningClientFactory auto-registration with | ||
| * IcebergRESTCatalogPlanningClientFactory. | ||
| * | ||
| * These tests verify that the ServiceLoader-based auto-registration mechanism | ||
| * correctly discovers and registers the IcebergRESTCatalogPlanningClientFactory | ||
| * when it's on the classpath. | ||
| */ | ||
| class ServerSidePlanningClientFactoryAutoRegistrationSuite | ||
| extends QueryTest | ||
| with SharedSparkSession { | ||
|
|
||
| override def afterEach(): Unit = { | ||
| try { | ||
| ServerSidePlanningClientFactory.clearFactory() | ||
| } finally { | ||
| super.afterEach() | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Execute test block with clean factory state (setup + teardown). | ||
| */ | ||
| private def withCleanFactory[T](testFn: => T): T = { | ||
| try { | ||
| ServerSidePlanningClientFactory.clearFactory() | ||
| testFn | ||
| } finally { | ||
| ServerSidePlanningClientFactory.clearFactory() | ||
| } | ||
| } | ||
|
|
||
| test("auto-registration succeeds when IcebergRESTCatalogPlanningClientFactory " + | ||
| "is on classpath") { | ||
| withCleanFactory { | ||
| // Verify factory is not registered initially | ||
| assert(!ServerSidePlanningClientFactory.isFactoryRegistered(), | ||
| "Factory should not be registered initially") | ||
| assert(ServerSidePlanningClientFactory.getRegisteredFactoryName().isEmpty, | ||
| "Factory info should be empty initially") | ||
|
|
||
| // Calling getFactory() should trigger auto-registration | ||
| val factory = ServerSidePlanningClientFactory.getFactory() | ||
|
|
||
| // Verify factory is successfully registered | ||
| assert(factory != null, "Factory should not be null after auto-registration") | ||
| assert(ServerSidePlanningClientFactory.isFactoryRegistered(), | ||
| "Factory should be registered after getFactory() call") | ||
|
|
||
| // Verify it's the correct type | ||
| val factoryInfo = ServerSidePlanningClientFactory.getRegisteredFactoryName() | ||
| assert(factoryInfo.isDefined, | ||
| "Factory info should be defined after auto-registration") | ||
| assert(factoryInfo.get.contains("IcebergRESTCatalogPlanningClientFactory"), | ||
| s"Expected IcebergRESTCatalogPlanningClientFactory, got: ${factoryInfo.get}") | ||
| } | ||
| } | ||
|
|
||
| test("autoRegistrationAttempted flag prevents multiple registration attempts") { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test is nothing specific to iceberg client..it's testing clearing which is already covered in the spark module tests. |
||
| withCleanFactory { | ||
| // First call triggers auto-registration | ||
| val factory1 = ServerSidePlanningClientFactory.getFactory() | ||
| val factoryInfo1 = ServerSidePlanningClientFactory.getRegisteredFactoryName() | ||
|
|
||
| // Multiple calls should return the same cached instance | ||
| val factory2 = ServerSidePlanningClientFactory.getFactory() | ||
| val factory3 = ServerSidePlanningClientFactory.getFactory() | ||
|
|
||
| // Verify all calls return the same instance (reference equality) | ||
| assert(factory1 eq factory2, | ||
| "Second getFactory() call should return cached instance") | ||
| assert(factory2 eq factory3, | ||
| "Third getFactory() call should return cached instance") | ||
|
|
||
| // Verify factory info remains consistent | ||
| assert(ServerSidePlanningClientFactory.getRegisteredFactoryName() == factoryInfo1, | ||
| "Factory info should remain consistent across multiple calls") | ||
|
|
||
| // After clearFactory(), should allow fresh registration | ||
| ServerSidePlanningClientFactory.clearFactory() | ||
| val factory4 = ServerSidePlanningClientFactory.getFactory() | ||
|
|
||
| // Verify new registration occurred | ||
| assert(ServerSidePlanningClientFactory.isFactoryRegistered(), | ||
| "Factory should be registered after clearFactory() and getFactory()") | ||
| assert(ServerSidePlanningClientFactory.getRegisteredFactoryName().isDefined, | ||
| "Factory info should be defined after fresh registration") | ||
| assert(ServerSidePlanningClientFactory.getRegisteredFactoryName().get | ||
| .contains("IcebergRESTCatalogPlanningClientFactory"), | ||
| "Fresh registration should register Iceberg factory") | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -70,41 +70,92 @@ private[serverSidePlanning] trait ServerSidePlanningClientFactory { | |
| } | ||
|
|
||
| /** | ||
| * Registry for client factories. Can be configured for testing or to provide | ||
| * production implementations (e.g., IcebergRESTCatalogPlanningClientFactory). | ||
| * | ||
| * By default, no factory is registered. Production code should register an appropriate | ||
| * factory implementation before attempting to create clients. | ||
| * Registry for client factories. Automatically discovers and registers implementations | ||
| * using reflection-based auto-discovery on first access to the factory. Manual registration | ||
| * using setFactory() is only needed for testing or to override the auto-discovered factory. | ||
| */ | ||
| private[serverSidePlanning] object ServerSidePlanningClientFactory { | ||
| // Fully qualified class name for auto-registration via reflection | ||
| private val ICEBERG_FACTORY_CLASS_NAME = | ||
| "org.apache.spark.sql.delta.serverSidePlanning.IcebergRESTCatalogPlanningClientFactory" | ||
|
|
||
| @volatile private var registeredFactory: Option[ServerSidePlanningClientFactory] = None | ||
| @volatile private var autoRegistrationAttempted: Boolean = false | ||
|
|
||
| // Lazy initialization - only runs when getFactory() is called and no factory is set. | ||
| // Uses reflection to load the hardcoded IcebergRESTCatalogPlanningClientFactory class. | ||
| private def tryAutoRegisterFactory(): Unit = { | ||
| // Double-checked locking pattern to ensure initialization happens only once | ||
| if (!autoRegistrationAttempted) { | ||
| synchronized { | ||
| if (!autoRegistrationAttempted) { | ||
| autoRegistrationAttempted = true | ||
|
|
||
| try { | ||
| // Use reflection to load the Iceberg factory class | ||
| // scalastyle:off classforname | ||
| val clazz = Class.forName(ICEBERG_FACTORY_CLASS_NAME) | ||
| // scalastyle:on classforname | ||
| val factory = clazz.getConstructor().newInstance() | ||
| .asInstanceOf[ServerSidePlanningClientFactory] | ||
| registeredFactory = Some(factory) | ||
murali-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } catch { | ||
| case e: Exception => | ||
| throw new IllegalStateException( | ||
| "Unable to load IcebergRESTCatalogPlanningClientFactory " + | ||
| "for server-side planning. Ensure the delta-iceberg JAR is on the " + | ||
| "classpath and compatible with this Delta version.", | ||
| e) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Set a factory for production use or testing. | ||
| * Set a factory, overriding any auto-registered factory. | ||
| * Synchronized to prevent race conditions with auto-registration. | ||
| */ | ||
| private[serverSidePlanning] def setFactory(factory: ServerSidePlanningClientFactory): Unit = { | ||
| registeredFactory = Some(factory) | ||
| synchronized { | ||
| registeredFactory = Some(factory) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Clear the registered factory. | ||
| * Synchronized to ensure atomic reset of both flags. | ||
| */ | ||
| private[serverSidePlanning] def clearFactory(): Unit = { | ||
| registeredFactory = None | ||
| synchronized { | ||
| registeredFactory = None | ||
| autoRegistrationAttempted = false | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Get the currently registered factory. | ||
| * Throws IllegalStateException if no factory has been registered. | ||
| * Throws IllegalStateException if no factory has been registered (either via reflection-based | ||
| * auto-discovery or explicit setFactory() call). | ||
| */ | ||
| def getFactory(): ServerSidePlanningClientFactory = { | ||
| // Try auto-registration if not already attempted and no factory is manually set | ||
| if (registeredFactory.isEmpty) { | ||
| tryAutoRegisterFactory() | ||
| } | ||
|
|
||
| registeredFactory.getOrElse { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't the dead access of the registeredFactory be in synchronized as well? Alternatively registeredFactory is marked as volatile and the only synchronization needed is only in the auto registration so that multiple thread don't register concurrently. |
||
| throw new IllegalStateException( | ||
| "No ServerSidePlanningClientFactory has been registered. " + | ||
| "Call ServerSidePlanningClientFactory.setFactory() to register an implementation.") | ||
| "Ensure delta-iceberg JAR is on the classpath for auto-registration, " + | ||
| "or call ServerSidePlanningClientFactory.setFactory() to register manually.") | ||
| } | ||
| } | ||
|
|
||
| def isFactoryRegistered(): Boolean = registeredFactory.isDefined | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are these one line functions needed?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These seem to be used only for testing. Seems unnecessary additional public methods. |
||
|
|
||
| def getRegisteredFactoryName(): Option[String] = registeredFactory.map(_.getClass.getName) | ||
|
|
||
| /** | ||
| * Convenience method to create a client from metadata using the registered factory. | ||
| */ | ||
murali-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,135 @@ | ||
| /* | ||
| * Copyright (2025) The Delta Lake Project Authors. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 2026 |
||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.delta.serverSidePlanning | ||
|
|
||
| import org.apache.spark.sql.QueryTest | ||
| import org.apache.spark.sql.test.SharedSparkSession | ||
|
|
||
| /** | ||
| * Unit tests for ServerSidePlanningClientFactory core functionality. | ||
| * Tests manual factory registration, state management, and lifecycle. | ||
| */ | ||
| class ServerSidePlanningClientFactorySuite extends QueryTest with SharedSparkSession { | ||
|
|
||
| override def afterEach(): Unit = { | ||
| try { | ||
| ServerSidePlanningClientFactory.clearFactory() | ||
| } finally { | ||
| super.afterEach() | ||
| } | ||
| } | ||
|
|
||
| // ========== Test Infrastructure ========== | ||
|
|
||
| /** | ||
| * Execute test block with clean factory state (setup + teardown). | ||
| * Ensures factory is cleared before and after the test. | ||
| */ | ||
| private def withCleanFactory[T](testFn: => T): T = { | ||
| try { | ||
| ServerSidePlanningClientFactory.clearFactory() | ||
| testFn | ||
| } finally { | ||
| ServerSidePlanningClientFactory.clearFactory() | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Assert that factory is registered and matches expected type. | ||
| * Includes descriptive error messages. | ||
| */ | ||
| private def assertFactoryType( | ||
| expectedType: String, | ||
| context: String = ""): Unit = { | ||
| val prefix = if (context.nonEmpty) s"[$context] " else "" | ||
|
|
||
| assert(ServerSidePlanningClientFactory.isFactoryRegistered(), | ||
| s"${prefix}Factory should be registered") | ||
|
|
||
| val factoryInfo = ServerSidePlanningClientFactory.getRegisteredFactoryName() | ||
| assert(factoryInfo.isDefined, | ||
| s"${prefix}Factory info should be defined") | ||
| assert(factoryInfo.get.contains(expectedType), | ||
| s"${prefix}Expected factory type=$expectedType, got: ${factoryInfo.get}") | ||
| } | ||
|
|
||
| /** | ||
| * Assert that factory is NOT registered. | ||
| */ | ||
| private def assertNoFactory(context: String = ""): Unit = { | ||
| val prefix = if (context.nonEmpty) s"[$context] " else "" | ||
|
|
||
| assert(!ServerSidePlanningClientFactory.isFactoryRegistered(), | ||
| s"${prefix}Factory should not be registered") | ||
| assert(ServerSidePlanningClientFactory.getRegisteredFactoryName().isEmpty, | ||
| s"${prefix}Factory info should be empty") | ||
| } | ||
|
|
||
| // ========== Tests ========== | ||
|
|
||
| test("manual setFactory can replace existing factory") { | ||
| withCleanFactory { | ||
| // Set first factory | ||
| val firstFactory = new TestServerSidePlanningClientFactory() | ||
| ServerSidePlanningClientFactory.setFactory(firstFactory) | ||
| assert(ServerSidePlanningClientFactory.getFactory() eq firstFactory, | ||
| "Should return first factory") | ||
|
|
||
| // Replace with second factory | ||
| val secondFactory = new TestServerSidePlanningClientFactory() | ||
| ServerSidePlanningClientFactory.setFactory(secondFactory) | ||
|
|
||
| // Verify replacement | ||
| val retrievedFactory = ServerSidePlanningClientFactory.getFactory() | ||
| assert(retrievedFactory eq secondFactory, | ||
| "getFactory() should return the second factory after replacement") | ||
| assert(!(retrievedFactory eq firstFactory), | ||
| "Should not return the first factory") | ||
| } | ||
| } | ||
|
|
||
| test("getFactory returns same instance across multiple calls") { | ||
| withCleanFactory { | ||
| val testFactory = new TestServerSidePlanningClientFactory() | ||
| ServerSidePlanningClientFactory.setFactory(testFactory) | ||
|
|
||
| val factory1 = ServerSidePlanningClientFactory.getFactory() | ||
| val factory2 = ServerSidePlanningClientFactory.getFactory() | ||
| val factory3 = ServerSidePlanningClientFactory.getFactory() | ||
|
|
||
| assert(factory1 eq factory2, "Second call should return same instance as first") | ||
| assert(factory2 eq factory3, "Third call should return same instance as second") | ||
| assert(factory1 eq testFactory, "Should return the originally set factory") | ||
| } | ||
| } | ||
|
|
||
| test("clearFactory resets registration state") { | ||
| withCleanFactory { | ||
| val testFactory = new TestServerSidePlanningClientFactory() | ||
| ServerSidePlanningClientFactory.setFactory(testFactory) | ||
| assert(ServerSidePlanningClientFactory.isFactoryRegistered(), | ||
| "Factory should be registered") | ||
|
|
||
| // Clear factory | ||
| ServerSidePlanningClientFactory.clearFactory() | ||
|
|
||
| // Verify factory is no longer registered | ||
| assertNoFactory("after clearFactory") | ||
| } | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,8 @@ class JarSuite extends AnyFunSuite { | |
| "scala/", | ||
| // e.g. org/apache/spark/sql/delta/icebergShaded/IcebergTransactionUtils.class | ||
| "org/apache/spark/sql/delta/icebergShaded/", | ||
| // Server-side planning support | ||
| "org/apache/spark/sql/delta/serverSidePlanning/", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do you need this changes now since you switched to reflection based approach?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm getting some errors without this (java.lang.Exception: Prohibited jar classes found). here's what i think is happening: delta-iceberg has some sort of allowlist so it excludes classes by default (to prevent dupes vs delta-spark i guess). and i need to allow it in this suite so that it does not complain about a new class being present in the jar (due to including it in build.sbt) when it shouldn't . |
||
| // We explicitly include all the /delta/commands/convert classes we want, to ensure we don't | ||
| // accidentally pull in some from delta-spark package. | ||
| "org/apache/spark/sql/delta/commands/convert/IcebergFileManifest", | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in iceberg module specifically testing iceberg factory is getting auto registered ...shouldnt it be named in the same way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact why do we even need to test this. The only thing that need to be tested outside the other test suite in spark module is that it's iceberg factory by default. That's very tiny test and can't that be added to any existing iceberg module ssp suite??