Skip to content

[SPARK-55337][SS] Fix MemoryStream backward compatibility#54108

Open
cloud-fan wants to merge 4 commits intoapache:masterfrom
cloud-fan:memory-stream-compat
Open

[SPARK-55337][SS] Fix MemoryStream backward compatibility#54108
cloud-fan wants to merge 4 commits intoapache:masterfrom
cloud-fan:memory-stream-compat

Conversation

@cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

This is a followup to #52402 that addresses backward compatibility concerns:

  1. Keep the original implicit SQLContext factory methods for full backward compatibility
  2. Add new overloads with explicit SparkSession parameter for new code
  3. Fix TestGraphRegistrationContext to provide implicit spark and sqlContext to avoid name shadowing issues in nested classes
  4. Remove redundant implicit val sparkSession declarations from pipeline tests that are no longer needed with the fix

Why are the changes needed?

PR #52402 changed the MemoryStream API to use implicit SparkSession which broke backward compatibility for code that only has implicit SQLContext available. This followup ensures:

Does this PR introduce any user-facing change?

No. This maintains full backward compatibility while adding new API options.

How was this patch tested?

Existing tests pass. The API changes are additive.

Was this patch authored or co-authored using generative AI tooling?

Yes

Made with Cursor

@github-actions
Copy link

github-actions bot commented Feb 3, 2026

JIRA Issue Information

=== Bug SPARK-55337 ===
Summary: Fix MemoryStream backward compatibility
Assignee: None
Status: Open
Affected: ["4.1.1"]


This comment was automatically generated by GitHub Actions


// Deprecated: Used when an implicit SQLContext is in scope
@deprecated("Use MemoryStream.apply with an implicit SparkSession instead of SQLContext", "4.1.0")
def apply[A: Encoder]()(implicit sqlContext: SQLContext): MemoryStream[A] =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the problem. This is not backward compatible, as the previous API is def apply[A: Encoder](implicit sqlContext: SQLContext): MemoryStream[A] (no parentheses).

There is no way to keep both implicits. So the proposal here is to only keep implicit SQLContext, and require to pass SparkSession implicitly.

@cloud-fan cloud-fan force-pushed the memory-stream-compat branch 2 times, most recently from 5825d62 to 14f1f0f Compare February 3, 2026 05:57
@cloud-fan
Copy link
Contributor Author

cc @ganeshashree @HeartSaVioR

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan , we cannot create a follow-up for the released JIRA issue because your PR has a different fix version, 4.1.2 (or 4.2.0), instead of 4.1.0. Please create a new JIRA ID.

Image

@cloud-fan cloud-fan changed the title [SPARK-53656][SS][FOLLOWUP] Improve MemoryStream backward compatibility [SPARK-55337][SS] Fix MemoryStream backward compatibility Feb 3, 2026
### What changes were proposed in this pull request?

This is a followup to apache#52402 that addresses backward compatibility concerns:

1. Keep the original `implicit SQLContext` factory methods for full backward compatibility
2. Add new overloads with explicit `SparkSession` parameter for new code
3. Fix `TestGraphRegistrationContext` to provide implicit `spark` and `sqlContext` to avoid name shadowing issues in nested classes
4. Remove redundant `implicit val sparkSession` declarations from pipeline tests that are no longer needed with the fix

### Why are the changes needed?

PR apache#52402 changed the MemoryStream API to use `implicit SparkSession` which broke backward compatibility for code that only has `implicit SQLContext` available. This followup ensures:

- Old code continues to work without modification
- New code can use SparkSession with explicit parameters
- Internal implementation uses SparkSession (modernization from apache#52402)

### Does this PR introduce _any_ user-facing change?

No. This maintains full backward compatibility while adding new API options.

### How was this patch tested?

Existing tests pass. The API changes are additive.

### Was this patch authored or co-authored using generative AI tooling?

Yes

Co-authored-by: Cursor <cursoragent@cursor.com>
@dongjoon-hyun
Copy link
Member

Thank you for getting a new JIRA ID.

cloud-fan and others added 2 commits February 3, 2026 18:54
Remove the `apply[A: Encoder](numPartitions: Int, sparkSession: SparkSession)` factory method that creates a semantic trap - it can accidentally match calls like `MemoryStream[T](0, spark)` interpreting the first argument as `numPartitions` instead of `id`, causing zero partitions to be created and no data to flow.

Users who need both `numPartitions` and explicit `SparkSession` can use the case class constructor directly: `new MemoryStream[A](id, sparkSession, Some(numPartitions))`.

Co-authored-by: Cursor <cursoragent@cursor.com>
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 (one minor comment which I think it's easily addressable) but probably good to make clear about below...

Do we know about the impact of this change? It seems to be complicated to think through the impact of this change (and a prior change, since it was turned out to be not backward compatible).

* Creates a MemoryStream with explicit encoder and SparkSession.
* Usage: `MemoryStream(Encoders.scalaInt, spark)`
*/
def apply[A](encoder: Encoder[A], sparkSession: SparkSession): MemoryStream[A] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I roughly remember the intention was to discourage the usage of SQLContext - if that's the case, we probably want to have the way to pass numPartitions parameter.

That said, looks like this method (explicit encoder instance) is newly added. Is there any usage of this? We don't seem to add the same in ContinuousMemoryStream and LowLatencyMemoryStream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in quite some places like StreamingQueryManagerSuite

  testQuietly("can start a streaming query with the same name in a different session") {
    val session2 = spark.cloneSession()

    val ds1 = MemoryStream(Encoders.INT, spark).toDS()
    val ds2 = MemoryStream(Encoders.INT, session2).toDS()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've add a new overload to specific numPartitions with SparkSession.

intsToDF(expected)(schema))
}

test("LowPriorityMemoryStreamImplicits works with implicit sqlContext") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol we added tests in wrong place... the file seems to be for memory "sink", not memory "source".

@cloud-fan
Copy link
Contributor Author

Do we know about the impact of this change? It seems to be complicated to think through the impact of this change (and a prior change, since it was turned out to be not backward compatible).

It's internal API so this is not strictly a bug fix. It will break new Spark 4.1 app that start to use the new def apply with SparkSession implicit, but not fixing it will break a lot more Spark apps that are not upgraded to 4.1 yet. There is no way to support both as they just conflict with each other.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending CI

It's unfortunate we broke the backward compatibility and fixing it would break it again, but I understand there is no better way. Thanks for fixing the nasty bug.

@HeartSaVioR
Copy link
Contributor

Shall we rerun the CI? It's good to try again before looking into CI failure and say it's not relevant to this change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants