Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51214][ML][PYTHON][CONNECT] Don't eagerly remove the cached models for fit_transform #49948

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Feb 14, 2025

What changes were proposed in this pull request?

Don't eagerly remove the cached models for fit_transform:
1, still keep the Delete ml command protobuf, but no longer call it in __del__ in the python client side;
2, build the ml cache with guava CacheBuilder and soft references, and specify the maximum size and time out.

Why are the changes needed?

a common ml pipeline pattern is fit_transform:

def fit_transform(df):
    model = estimator.fit(df)
    return model.transform(df)

df2 = fit_transform(df)
df2.count()

existing implementation eagerly deletes the intermediate model from the ml cache, right after fit_transform, and thus causes NPE

pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.lang.NullPointerException) Cannot invoke "org.apache.spark.ml.Model.copy(org.apache.spark.ml.param.ParamMap)" because "model" is null

JVM stacktrace:
java.lang.NullPointerException
	at org.apache.spark.sql.connect.ml.ModelAttributeHelper.transform(MLHandler.scala:68)
	at org.apache.spark.sql.connect.ml.MLHandler$.transformMLRelation(MLHandler.scala:313)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.$anonfun$transformRelation$1(SparkConnectPlanner.scala:231)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$usePlanCache$3(SessionHolder.scala:477)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.sql.connect.service.SessionHolder.usePlanCache(SessionHolder.scala:476)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:147)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:133)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelationalGroupedAggregate(SparkConnectPlanner.scala:2318)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformAggregate(SparkConnectPlanner.scala:2299)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.$anonfun$transformRelation$1(SparkConnectPlanner.scala:165)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$usePlanCache$3(SessionHolder.scala:477)

Does this PR introduce any user-facing change?

yes

How was this patch tested?

added tests

Was this patch authored or co-authored using generative AI tooling?

no


private[connect] object MLCache {
// The maximum number of distinct items in the cache.
private val MAX_CACHED_ITEMS = 100
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can add configs later (in 4.1)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we can't find an item in the cache, do we throw a proper exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently, it throws a NPE

let me add an error class for it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants