Skip to content

[SPARK-52008] [SS] Add StateStore TaskCompletionListener to abort store and throw error #50795

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

liviazhu-db
Copy link
Contributor

@liviazhu-db liviazhu-db commented May 5, 2025

What changes were proposed in this pull request?

Added TaskCompletionListener for both HDFS and RocksDB StateStore that will check whether the state is still in the UPDATING state (not committed or aborted) and abort it. If the task isn't failed or interrupted, it will fail the task with STATE_STORE_UPDATING_AFTER_TASK_COMPLETION.

Why are the changes needed?

As explained in SPARK-52008, when a user defines a function with foreachBatch that does not completely consume the passed in iterator, state stores will be opened but not committed when the batch finishes and no error will be thrown. This will lead to "changelog/delta file not found" error for the next batch which confuses users.

Instead, we should explicitly throw an error in the TaskCompletionListener that will abort any state stores still in the updating state and throw an exception to fail the task (if the task is not already failed or interrupted).

Does this PR introduce any user-facing change?

Yes, throws STATE_STORE_UPDATING_AFTER_TASK_COMPLETION instead of FileNotFound error.

How was this patch tested?

New FEB integration test and unit test.

Was this patch authored or co-authored using generative AI tooling?

No

@liviazhu-db liviazhu-db marked this pull request as ready for review May 5, 2025 23:03
@@ -113,6 +113,18 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
case object COMMITTED extends STATE
case object ABORTED extends STATE

Option(TaskContext.get()).foreach { ctxt =>
ctxt.addTaskCompletionListener[Unit]( ctx => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add a note about the order in which these listeners fire and ensure that any change touching this part does not violate the underlying assumptions ?

val errClass = "STATE_STORE_UPDATING_AFTER_TASK_COMPLETION"

// verify that we classified the exception
assert(queryEx.getMessage.contains(errClass))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use checkError instead ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants