|
17 | 17 | package org.apache.spark.sql.delta.serverSidePlanning |
18 | 18 |
|
19 | 19 | import org.apache.spark.sql.SparkSession |
| 20 | +import org.apache.spark.sql.delta.metering.DeltaLogging |
20 | 21 | import org.apache.spark.sql.sources.Filter |
21 | 22 |
|
22 | 23 | /** |
@@ -70,41 +71,97 @@ private[serverSidePlanning] trait ServerSidePlanningClientFactory { |
70 | 71 | } |
71 | 72 |
|
72 | 73 | /** |
73 | | - * Registry for client factories. Can be configured for testing or to provide |
74 | | - * production implementations (e.g., IcebergRESTCatalogPlanningClientFactory). |
| 74 | + * Registry for client factories. Automatically discovers and registers implementations |
| 75 | + * using reflection-based auto-discovery on first access to the factory. |
75 | 76 | * |
76 | | - * By default, no factory is registered. Production code should register an appropriate |
77 | | - * factory implementation before attempting to create clients. |
| 77 | + * When delta-iceberg JAR is on the classpath, IcebergRESTCatalogPlanningClientFactory |
| 78 | + * is automatically registered via reflection with a hardcoded class name. Manual registration |
| 79 | + * using setFactory() is only needed for testing or to override the auto-discovered factory. |
78 | 80 | */ |
79 | | -private[serverSidePlanning] object ServerSidePlanningClientFactory { |
| 81 | +private[serverSidePlanning] object ServerSidePlanningClientFactory extends DeltaLogging { |
80 | 82 | @volatile private var registeredFactory: Option[ServerSidePlanningClientFactory] = None |
| 83 | + @volatile private var serviceLoaderAttempted: Boolean = false |
| 84 | + |
| 85 | + // ========== REFLECTION-BASED AUTO-REGISTRATION ========== |
| 86 | + // Lazy initialization - only runs when getFactory() is called and no factory is set. |
| 87 | + // Uses reflection to load the hardcoded IcebergRESTCatalogPlanningClientFactory class. |
| 88 | + private def tryAutoRegisterFactory(): Unit = { |
| 89 | + // Double-checked locking pattern to ensure initialization happens only once |
| 90 | + if (!serviceLoaderAttempted) { |
| 91 | + synchronized { |
| 92 | + if (!serviceLoaderAttempted) { |
| 93 | + serviceLoaderAttempted = true |
| 94 | + |
| 95 | + try { |
| 96 | + // Use reflection to load the Iceberg factory class |
| 97 | + val clazz = Class.forName( |
| 98 | + "org.apache.spark.sql.delta.serverSidePlanning.IcebergRESTCatalogPlanningClientFactory") |
| 99 | + val factory = clazz.getConstructor().newInstance() |
| 100 | + .asInstanceOf[ServerSidePlanningClientFactory] |
| 101 | + registeredFactory = Some(factory) |
| 102 | + } catch { |
| 103 | + case _: ClassNotFoundException => |
| 104 | + // delta-iceberg not on classpath, no factory available |
| 105 | + // This is fine - server-side planning just won't be available |
| 106 | + case e: Exception => |
| 107 | + // Unexpected error during reflection - log but don't fail |
| 108 | + logWarning(s"Failed to load server-side planning factory: ${e.getMessage}") |
| 109 | + } |
| 110 | + } |
| 111 | + } |
| 112 | + } |
| 113 | + } |
| 114 | + // ========== END REFLECTION-BASED AUTO-REGISTRATION ========== |
81 | 115 |
|
82 | 116 | /** |
83 | | - * Set a factory for production use or testing. |
| 117 | + * Set a factory, overriding any auto-registered factory. |
| 118 | + * Primarily useful for testing or providing custom implementations. |
84 | 119 | */ |
85 | 120 | private[serverSidePlanning] def setFactory(factory: ServerSidePlanningClientFactory): Unit = { |
86 | 121 | registeredFactory = Some(factory) |
87 | 122 | } |
88 | 123 |
|
89 | 124 | /** |
90 | | - * Clear the registered factory. |
| 125 | + * Clear the registered factory. Primarily useful for testing to reset state between tests. |
| 126 | + * This also resets the ServiceLoader discovery state, allowing it to be re-triggered on |
| 127 | + * the next getFactory() call. |
91 | 128 | */ |
92 | 129 | private[serverSidePlanning] def clearFactory(): Unit = { |
93 | 130 | registeredFactory = None |
| 131 | + serviceLoaderAttempted = false |
94 | 132 | } |
95 | 133 |
|
96 | 134 | /** |
97 | 135 | * Get the currently registered factory. |
98 | | - * Throws IllegalStateException if no factory has been registered. |
| 136 | + * Throws IllegalStateException if no factory has been registered (either via ServiceLoader |
| 137 | + * auto-discovery or explicit setFactory() call). |
99 | 138 | */ |
100 | 139 | def getFactory(): ServerSidePlanningClientFactory = { |
| 140 | + // Try auto-registration if not already attempted and no factory is manually set |
| 141 | + if (registeredFactory.isEmpty) { |
| 142 | + tryAutoRegisterFactory() |
| 143 | + } |
| 144 | + |
101 | 145 | registeredFactory.getOrElse { |
102 | 146 | throw new IllegalStateException( |
103 | 147 | "No ServerSidePlanningClientFactory has been registered. " + |
104 | | - "Call ServerSidePlanningClientFactory.setFactory() to register an implementation.") |
| 148 | + "Ensure delta-iceberg JAR is on the classpath for auto-registration, " + |
| 149 | + "or call ServerSidePlanningClientFactory.setFactory() to register manually.") |
105 | 150 | } |
106 | 151 | } |
107 | 152 |
|
| 153 | + /** |
| 154 | + * Check if a factory is currently registered (either via ServiceLoader or setFactory()). |
| 155 | + * Useful for testing or conditional logic. |
| 156 | + */ |
| 157 | + def isFactoryRegistered(): Boolean = registeredFactory.isDefined |
| 158 | + |
| 159 | + /** |
| 160 | + * Get information about the currently registered factory for debugging/logging. |
| 161 | + * Returns None if no factory is registered. |
| 162 | + */ |
| 163 | + def getFactoryInfo(): Option[String] = registeredFactory.map(_.getClass.getName) |
| 164 | + |
108 | 165 | /** |
109 | 166 | * Convenience method to create a client from metadata using the registered factory. |
110 | 167 | */ |
|
0 commit comments