Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
cf9c88b
Add reflection-based auto-registration for ServerSidePlanningClientFa…
murali-db Jan 22, 2026
d8df656
Simplify documentation comments
murali-db Jan 22, 2026
1ecfc3b
Clean up comments
murali-db Jan 22, 2026
2371d01
Throw specific errors for factory loading failures
murali-db Jan 22, 2026
7758d5b
Rename serviceLoaderAttempted to autoRegistrationAttempted
murali-db Jan 22, 2026
fcf1333
Simplify error handling to single catch block
murali-db Jan 22, 2026
2f1ddf2
Fix scalastyle line length violations
murali-db Jan 22, 2026
c6ccf01
Fix remaining scalastyle violations
murali-db Jan 22, 2026
b9fedac
Remove unused DeltaLogging import and trait extension
murali-db Jan 22, 2026
2f60c19
Add comprehensive tests for ServerSidePlanningClientFactory auto-regi…
murali-db Jan 23, 2026
c81fbc5
Refactor ServerSidePlanningClientFactorySuite to eliminate duplication
murali-db Jan 23, 2026
27e2588
Remove unnecessary FactoryAction sealed trait abstraction
murali-db Jan 23, 2026
1385732
Move auto-registration tests from spark to iceberg module
murali-db Jan 23, 2026
005a9db
Remove redundant auto-registration test cases
murali-db Jan 23, 2026
b984757
Extract class name constant and add synchronization to factory methods
murali-db Jan 23, 2026
fda4644
Remove unnecessary assertSameInstance helper function
murali-db Jan 23, 2026
9599b17
Remove redundant comments from isFactoryRegistered and getFactoryInfo
murali-db Jan 23, 2026
db2c72f
Remove redundant tests and rename getFactoryInfo to getRegisteredFact…
murali-db Jan 23, 2026
e26657c
Remove redundant test cases to keep only essential coverage
murali-db Jan 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,8 @@ lazy val testDeltaIcebergJar = (project in file("testDeltaIcebergJar"))
val deltaIcebergSparkIncludePrefixes = Seq(
// We want everything from this package
"org/apache/spark/sql/delta/icebergShaded",
// Server-side planning support
"org/apache/spark/sql/delta/serverSidePlanning",

// We only want the files in this project from this package. e.g. we want to exclude
// org/apache/spark/sql/delta/commands/convert/ConvertTargetFile.class (from delta-spark project).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.serverSidePlanning

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession

/**
* Tests for ServerSidePlanningClientFactory auto-registration with
* IcebergRESTCatalogPlanningClientFactory.
*
* These tests verify that the ServiceLoader-based auto-registration mechanism
* correctly discovers and registers the IcebergRESTCatalogPlanningClientFactory
* when it's on the classpath.
*/
class ServerSidePlanningClientFactoryAutoRegistrationSuite
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in iceberg module specifically testing iceberg factory is getting auto registered ...shouldnt it be named in the same way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact why do we even need to test this. The only thing that need to be tested outside the other test suite in spark module is that it's iceberg factory by default. That's very tiny test and can't that be added to any existing iceberg module ssp suite??

extends QueryTest
with SharedSparkSession {

override def afterEach(): Unit = {
try {
ServerSidePlanningClientFactory.clearFactory()
} finally {
super.afterEach()
}
}

/**
* Execute test block with clean factory state (setup + teardown).
*/
private def withCleanFactory[T](testFn: => T): T = {
try {
ServerSidePlanningClientFactory.clearFactory()
testFn
} finally {
ServerSidePlanningClientFactory.clearFactory()
}
}

test("auto-registration succeeds when IcebergRESTCatalogPlanningClientFactory " +
"is on classpath") {
withCleanFactory {
// Verify factory is not registered initially
assert(!ServerSidePlanningClientFactory.isFactoryRegistered(),
"Factory should not be registered initially")
assert(ServerSidePlanningClientFactory.getRegisteredFactoryName().isEmpty,
"Factory info should be empty initially")

// Calling getFactory() should trigger auto-registration
val factory = ServerSidePlanningClientFactory.getFactory()

// Verify factory is successfully registered
assert(factory != null, "Factory should not be null after auto-registration")
assert(ServerSidePlanningClientFactory.isFactoryRegistered(),
"Factory should be registered after getFactory() call")

// Verify it's the correct type
val factoryInfo = ServerSidePlanningClientFactory.getRegisteredFactoryName()
assert(factoryInfo.isDefined,
"Factory info should be defined after auto-registration")
assert(factoryInfo.get.contains("IcebergRESTCatalogPlanningClientFactory"),
s"Expected IcebergRESTCatalogPlanningClientFactory, got: ${factoryInfo.get}")
}
}

test("autoRegistrationAttempted flag prevents multiple registration attempts") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is nothing specific to iceberg client..it's testing clearing which is already covered in the spark module tests.

withCleanFactory {
// First call triggers auto-registration
val factory1 = ServerSidePlanningClientFactory.getFactory()
val factoryInfo1 = ServerSidePlanningClientFactory.getRegisteredFactoryName()

// Multiple calls should return the same cached instance
val factory2 = ServerSidePlanningClientFactory.getFactory()
val factory3 = ServerSidePlanningClientFactory.getFactory()

// Verify all calls return the same instance (reference equality)
assert(factory1 eq factory2,
"Second getFactory() call should return cached instance")
assert(factory2 eq factory3,
"Third getFactory() call should return cached instance")

// Verify factory info remains consistent
assert(ServerSidePlanningClientFactory.getRegisteredFactoryName() == factoryInfo1,
"Factory info should remain consistent across multiple calls")

// After clearFactory(), should allow fresh registration
ServerSidePlanningClientFactory.clearFactory()
val factory4 = ServerSidePlanningClientFactory.getFactory()

// Verify new registration occurred
assert(ServerSidePlanningClientFactory.isFactoryRegistered(),
"Factory should be registered after clearFactory() and getFactory()")
assert(ServerSidePlanningClientFactory.getRegisteredFactoryName().isDefined,
"Factory info should be defined after fresh registration")
assert(ServerSidePlanningClientFactory.getRegisteredFactoryName().get
.contains("IcebergRESTCatalogPlanningClientFactory"),
"Fresh registration should register Iceberg factory")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,41 +70,92 @@ private[serverSidePlanning] trait ServerSidePlanningClientFactory {
}

/**
* Registry for client factories. Can be configured for testing or to provide
* production implementations (e.g., IcebergRESTCatalogPlanningClientFactory).
*
* By default, no factory is registered. Production code should register an appropriate
* factory implementation before attempting to create clients.
* Registry for client factories. Automatically discovers and registers implementations
* using reflection-based auto-discovery on first access to the factory. Manual registration
* using setFactory() is only needed for testing or to override the auto-discovered factory.
*/
private[serverSidePlanning] object ServerSidePlanningClientFactory {
// Fully qualified class name for auto-registration via reflection
private val ICEBERG_FACTORY_CLASS_NAME =
"org.apache.spark.sql.delta.serverSidePlanning.IcebergRESTCatalogPlanningClientFactory"

@volatile private var registeredFactory: Option[ServerSidePlanningClientFactory] = None
@volatile private var autoRegistrationAttempted: Boolean = false

// Lazy initialization - only runs when getFactory() is called and no factory is set.
// Uses reflection to load the hardcoded IcebergRESTCatalogPlanningClientFactory class.
private def tryAutoRegisterFactory(): Unit = {
// Double-checked locking pattern to ensure initialization happens only once
if (!autoRegistrationAttempted) {
synchronized {
if (!autoRegistrationAttempted) {
autoRegistrationAttempted = true

try {
// Use reflection to load the Iceberg factory class
// scalastyle:off classforname
val clazz = Class.forName(ICEBERG_FACTORY_CLASS_NAME)
// scalastyle:on classforname
val factory = clazz.getConstructor().newInstance()
.asInstanceOf[ServerSidePlanningClientFactory]
registeredFactory = Some(factory)
} catch {
case e: Exception =>
throw new IllegalStateException(
"Unable to load IcebergRESTCatalogPlanningClientFactory " +
"for server-side planning. Ensure the delta-iceberg JAR is on the " +
"classpath and compatible with this Delta version.",
e)
}
}
}
}
}

/**
* Set a factory for production use or testing.
* Set a factory, overriding any auto-registered factory.
* Synchronized to prevent race conditions with auto-registration.
*/
private[serverSidePlanning] def setFactory(factory: ServerSidePlanningClientFactory): Unit = {
registeredFactory = Some(factory)
synchronized {
registeredFactory = Some(factory)
}
}

/**
* Clear the registered factory.
* Synchronized to ensure atomic reset of both flags.
*/
private[serverSidePlanning] def clearFactory(): Unit = {
registeredFactory = None
synchronized {
registeredFactory = None
autoRegistrationAttempted = false
}
}

/**
* Get the currently registered factory.
* Throws IllegalStateException if no factory has been registered.
* Throws IllegalStateException if no factory has been registered (either via reflection-based
* auto-discovery or explicit setFactory() call).
*/
def getFactory(): ServerSidePlanningClientFactory = {
// Try auto-registration if not already attempted and no factory is manually set
if (registeredFactory.isEmpty) {
tryAutoRegisterFactory()
}

registeredFactory.getOrElse {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the dead access of the registeredFactory be in synchronized as well?

Alternatively registeredFactory is marked as volatile and the only synchronization needed is only in the auto registration so that multiple thread don't register concurrently.

throw new IllegalStateException(
"No ServerSidePlanningClientFactory has been registered. " +
"Call ServerSidePlanningClientFactory.setFactory() to register an implementation.")
"Ensure delta-iceberg JAR is on the classpath for auto-registration, " +
"or call ServerSidePlanningClientFactory.setFactory() to register manually.")
}
}

def isFactoryRegistered(): Boolean = registeredFactory.isDefined
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these one line functions needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These seem to be used only for testing. Seems unnecessary additional public methods.


def getRegisteredFactoryName(): Option[String] = registeredFactory.map(_.getClass.getName)

/**
* Convenience method to create a client from metadata using the registered factory.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2026

*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.serverSidePlanning

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession

/**
* Unit tests for ServerSidePlanningClientFactory core functionality.
* Tests manual factory registration, state management, and lifecycle.
*/
class ServerSidePlanningClientFactorySuite extends QueryTest with SharedSparkSession {

override def afterEach(): Unit = {
try {
ServerSidePlanningClientFactory.clearFactory()
} finally {
super.afterEach()
}
}

// ========== Test Infrastructure ==========

/**
* Execute test block with clean factory state (setup + teardown).
* Ensures factory is cleared before and after the test.
*/
private def withCleanFactory[T](testFn: => T): T = {
try {
ServerSidePlanningClientFactory.clearFactory()
testFn
} finally {
ServerSidePlanningClientFactory.clearFactory()
}
}

/**
* Assert that factory is registered and matches expected type.
* Includes descriptive error messages.
*/
private def assertFactoryType(
expectedType: String,
context: String = ""): Unit = {
val prefix = if (context.nonEmpty) s"[$context] " else ""

assert(ServerSidePlanningClientFactory.isFactoryRegistered(),
s"${prefix}Factory should be registered")

val factoryInfo = ServerSidePlanningClientFactory.getRegisteredFactoryName()
assert(factoryInfo.isDefined,
s"${prefix}Factory info should be defined")
assert(factoryInfo.get.contains(expectedType),
s"${prefix}Expected factory type=$expectedType, got: ${factoryInfo.get}")
}

/**
* Assert that factory is NOT registered.
*/
private def assertNoFactory(context: String = ""): Unit = {
val prefix = if (context.nonEmpty) s"[$context] " else ""

assert(!ServerSidePlanningClientFactory.isFactoryRegistered(),
s"${prefix}Factory should not be registered")
assert(ServerSidePlanningClientFactory.getRegisteredFactoryName().isEmpty,
s"${prefix}Factory info should be empty")
}

// ========== Tests ==========

test("manual setFactory can replace existing factory") {
withCleanFactory {
// Set first factory
val firstFactory = new TestServerSidePlanningClientFactory()
ServerSidePlanningClientFactory.setFactory(firstFactory)
assert(ServerSidePlanningClientFactory.getFactory() eq firstFactory,
"Should return first factory")

// Replace with second factory
val secondFactory = new TestServerSidePlanningClientFactory()
ServerSidePlanningClientFactory.setFactory(secondFactory)

// Verify replacement
val retrievedFactory = ServerSidePlanningClientFactory.getFactory()
assert(retrievedFactory eq secondFactory,
"getFactory() should return the second factory after replacement")
assert(!(retrievedFactory eq firstFactory),
"Should not return the first factory")
}
}

test("getFactory returns same instance across multiple calls") {
withCleanFactory {
val testFactory = new TestServerSidePlanningClientFactory()
ServerSidePlanningClientFactory.setFactory(testFactory)

val factory1 = ServerSidePlanningClientFactory.getFactory()
val factory2 = ServerSidePlanningClientFactory.getFactory()
val factory3 = ServerSidePlanningClientFactory.getFactory()

assert(factory1 eq factory2, "Second call should return same instance as first")
assert(factory2 eq factory3, "Third call should return same instance as second")
assert(factory1 eq testFactory, "Should return the originally set factory")
}
}

test("clearFactory resets registration state") {
withCleanFactory {
val testFactory = new TestServerSidePlanningClientFactory()
ServerSidePlanningClientFactory.setFactory(testFactory)
assert(ServerSidePlanningClientFactory.isFactoryRegistered(),
"Factory should be registered")

// Clear factory
ServerSidePlanningClientFactory.clearFactory()

// Verify factory is no longer registered
assertNoFactory("after clearFactory")
}
}

}
2 changes: 2 additions & 0 deletions testDeltaIcebergJar/src/test/scala/JarSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class JarSuite extends AnyFunSuite {
"scala/",
// e.g. org/apache/spark/sql/delta/icebergShaded/IcebergTransactionUtils.class
"org/apache/spark/sql/delta/icebergShaded/",
// Server-side planning support
"org/apache/spark/sql/delta/serverSidePlanning/",
Copy link
Contributor

@tdas tdas Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need this changes now since you switched to reflection based approach?

Copy link
Contributor Author

@murali-db murali-db Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm getting some errors without this (java.lang.Exception: Prohibited jar classes found). here's what i think is happening: delta-iceberg has some sort of allowlist so it excludes classes by default (to prevent dupes vs delta-spark i guess). and i need to allow it in this suite so that it does not complain about a new class being present in the jar (due to including it in build.sbt) when it shouldn't .

// We explicitly include all the /delta/commands/convert classes we want, to ensure we don't
// accidentally pull in some from delta-spark package.
"org/apache/spark/sql/delta/commands/convert/IcebergFileManifest",
Expand Down
Loading