-
Notifications
You must be signed in to change notification settings - Fork 2k
Add reflection-based auto-registration for ServerSidePlanningClientFactory #5907
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 9 commits
cf9c88b
d8df656
1ecfc3b
2371d01
7758d5b
fcf1333
2f1ddf2
c6ccf01
b9fedac
2f60c19
c81fbc5
27e2588
1385732
005a9db
b984757
fda4644
9599b17
db2c72f
e26657c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -70,17 +70,48 @@ private[serverSidePlanning] trait ServerSidePlanningClientFactory { | |
| } | ||
|
|
||
| /** | ||
| * Registry for client factories. Can be configured for testing or to provide | ||
| * production implementations (e.g., IcebergRESTCatalogPlanningClientFactory). | ||
| * | ||
| * By default, no factory is registered. Production code should register an appropriate | ||
| * factory implementation before attempting to create clients. | ||
| * Registry for client factories. Automatically discovers and registers implementations | ||
| * using reflection-based auto-discovery on first access to the factory. Manual registration | ||
| * using setFactory() is only needed for testing or to override the auto-discovered factory. | ||
| */ | ||
| private[serverSidePlanning] object ServerSidePlanningClientFactory { | ||
| @volatile private var registeredFactory: Option[ServerSidePlanningClientFactory] = None | ||
| @volatile private var autoRegistrationAttempted: Boolean = false | ||
|
|
||
| // Lazy initialization - only runs when getFactory() is called and no factory is set. | ||
| // Uses reflection to load the hardcoded IcebergRESTCatalogPlanningClientFactory class. | ||
| private def tryAutoRegisterFactory(): Unit = { | ||
| // Double-checked locking pattern to ensure initialization happens only once | ||
| if (!autoRegistrationAttempted) { | ||
| synchronized { | ||
| if (!autoRegistrationAttempted) { | ||
| autoRegistrationAttempted = true | ||
|
|
||
| try { | ||
| // Use reflection to load the Iceberg factory class | ||
| // scalastyle:off classforname | ||
| val clazz = Class.forName( | ||
| "org.apache.spark.sql.delta.serverSidePlanning." + | ||
| "IcebergRESTCatalogPlanningClientFactory") | ||
| // scalastyle:on classforname | ||
| val factory = clazz.getConstructor().newInstance() | ||
| .asInstanceOf[ServerSidePlanningClientFactory] | ||
| registeredFactory = Some(factory) | ||
murali-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } catch { | ||
| case e: Exception => | ||
| throw new IllegalStateException( | ||
| "Unable to load IcebergRESTCatalogPlanningClientFactory " + | ||
| "for server-side planning. Ensure the delta-iceberg JAR is on the " + | ||
| "classpath and compatible with this Delta version.", | ||
| e) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Set a factory for production use or testing. | ||
| * Set a factory, overriding any auto-registered factory. | ||
| */ | ||
| private[serverSidePlanning] def setFactory(factory: ServerSidePlanningClientFactory): Unit = { | ||
| registeredFactory = Some(factory) | ||
|
|
@@ -91,20 +122,40 @@ private[serverSidePlanning] object ServerSidePlanningClientFactory { | |
| */ | ||
| private[serverSidePlanning] def clearFactory(): Unit = { | ||
| registeredFactory = None | ||
| autoRegistrationAttempted = false | ||
| } | ||
|
|
||
| /** | ||
| * Get the currently registered factory. | ||
| * Throws IllegalStateException if no factory has been registered. | ||
| * Throws IllegalStateException if no factory has been registered (either via reflection-based | ||
| * auto-discovery or explicit setFactory() call). | ||
| */ | ||
| def getFactory(): ServerSidePlanningClientFactory = { | ||
| // Try auto-registration if not already attempted and no factory is manually set | ||
| if (registeredFactory.isEmpty) { | ||
| tryAutoRegisterFactory() | ||
| } | ||
|
|
||
| registeredFactory.getOrElse { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't the dead access of the registeredFactory be in synchronized as well? Alternatively registeredFactory is marked as volatile and the only synchronization needed is only in the auto registration so that multiple thread don't register concurrently. |
||
| throw new IllegalStateException( | ||
| "No ServerSidePlanningClientFactory has been registered. " + | ||
| "Call ServerSidePlanningClientFactory.setFactory() to register an implementation.") | ||
| "Ensure delta-iceberg JAR is on the classpath for auto-registration, " + | ||
| "or call ServerSidePlanningClientFactory.setFactory() to register manually.") | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Check if a factory is currently registered (either via ServiceLoader or setFactory()). | ||
| * Useful for testing or conditional logic. | ||
| */ | ||
| def isFactoryRegistered(): Boolean = registeredFactory.isDefined | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are these one line functions needed?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These seem to be used only for testing. Seems unnecessary additional public methods. |
||
|
|
||
| /** | ||
| * Get information about the currently registered factory for debugging/logging. | ||
| * Returns None if no factory is registered. | ||
| */ | ||
| def getFactoryInfo(): Option[String] = registeredFactory.map(_.getClass.getName) | ||
|
|
||
| /** | ||
| * Convenience method to create a client from metadata using the registered factory. | ||
| */ | ||
murali-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,8 @@ class JarSuite extends AnyFunSuite { | |
| "scala/", | ||
| // e.g. org/apache/spark/sql/delta/icebergShaded/IcebergTransactionUtils.class | ||
| "org/apache/spark/sql/delta/icebergShaded/", | ||
| // Server-side planning support | ||
| "org/apache/spark/sql/delta/serverSidePlanning/", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do you need this changes now since you switched to reflection based approach?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm getting some errors without this (java.lang.Exception: Prohibited jar classes found). here's what i think is happening: delta-iceberg has some sort of allowlist so it excludes classes by default (to prevent dupes vs delta-spark i guess). and i need to allow it in this suite so that it does not complain about a new class being present in the jar (due to including it in build.sbt) when it shouldn't . |
||
| // We explicitly include all the /delta/commands/convert classes we want, to ensure we don't | ||
| // accidentally pull in some from delta-spark package. | ||
| "org/apache/spark/sql/delta/commands/convert/IcebergFileManifest", | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should this be a const in the object?