Skip to content

Commit 14565a8

Browse files
committed
stuff
1 parent a4b7c10 commit 14565a8

File tree

4 files changed

+42
-18
lines changed

4 files changed

+42
-18
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala

+2-4
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,13 @@ private[sql] class RocksDBStateStoreProvider
4646
class RocksDBStateStore(lastVersion: Long) extends StateStore {
4747
/**
4848
* Trait and classes representing the internal state of the store
49-
*
49+
*
5050
* State transitions:
5151
* - Initial state: UPDATING
5252
* - UPDATING -> COMMITTED: After successful commit()
5353
* - UPDATING -> ABORTED: After abort() or failed commit()
5454
* - UPDATING -> RELEASED: After release() without committing changes
55-
* - COMMITTED -> RELEASED: After release() following a successful commit
56-
* - ABORTED -> RELEASED: After release() following an abort
57-
*
55+
*
5856
* The RELEASED state is terminal and indicates that resources have been released
5957
* without affecting the underlying data (unlike ABORTED which rolls back changes).
6058
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

+24-9
Original file line numberDiff line numberDiff line change
@@ -120,16 +120,31 @@ trait ReadStateStore {
120120

121121
/**
122122
* Release resources associated with this state store without rolling back changes.
123-
*
123+
*
124124
* Unlike `abort()` which rolls back uncommitted changes, `release()` simply releases
125-
* resources and locks without affecting the state data. This is particularly important
126-
* in the read-then-write pattern where:
127-
*
128-
* 1. A read-only store is opened to retrieve existing state
129-
* 2. The same store is converted to a writable store using `getWriteStore()`
130-
* 3. After the write store commits, we need to release resources without rolling back
131-
* the changes that were just committed
132-
*
125+
* resources and locks without affecting the state data.
126+
*
127+
* IMPORTANT: This method is only needed when you have a standalone ReadStateStore that
128+
* was NOT converted to a WriteStore. If you use the read-then-write pattern with
129+
* `getWriteStore()`, you should NOT call `release()` on the original ReadStateStore.
130+
*
131+
* Usage scenarios:
132+
* 1. When you have a standalone ReadStateStore that you're done with:
133+
* ```
134+
* val readStore = StateStore.getReadOnly(...)
135+
* // use readStore
136+
* readStore.release() // Correct: release the standalone read store
137+
* ```
138+
*
139+
* 2. When using the read-then-write pattern:
140+
* ```
141+
* val readStore = StateStore.getReadOnly(...)
142+
* val writeStore = StateStore.getWriteStore(readStore, ...)
143+
* // use writeStore
144+
* writeStore.commit()
145+
* // DO NOT call readStore.release() here - the writeStore handles cleanup
146+
* ```
147+
*
133148
* Implementations should ensure that:
134149
* 1. Any locks or resources held by this store are released
135150
* 2. No uncommitted changes are rolled back (unlike `abort()`)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala

+6-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,12 @@ import org.apache.spark.util.SerializableConfiguration
5555
* readStore = getReadStore()
5656
* writeStore = getWriteStore(readStore) // Reuses the same store connection
5757
* writeStore.commit()
58-
* // No need to abort/release readStore as it's the same underlying store
58+
* DO NOT call readStore.release() here - the writeStore handles cleanup
59+
*
60+
* IMPORTANT: When using this read-then-write pattern, you should NOT call release()
61+
* on the original ReadStateStore after committing the WriteStore. Since both stores
62+
* share the same underlying connection, the WriteStore's commit() will handle all
63+
* necessary cleanup.
5964
*/
6065
trait StateStoreRDDProvider {
6166
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala

+10-4
Original file line numberDiff line numberDiff line change
@@ -109,16 +109,22 @@ package object state {
109109

110110
val cleanedF = dataRDD.sparkContext.clean(storeReadFn)
111111
val wrappedF = (store: ReadStateStore, iter: Iterator[T]) => {
112-
// Clean up the state store.
112+
// Clean up the state store when the task completes successfully.
113+
// We use release() instead of abort() because:
114+
// 1. For standalone ReadStateStore, we want to release resources without rolling back
115+
// 2. For ReadStateStore that will be converted to WriteStore, the WriteStore
116+
// will handle cleanup
113117
val taskContext = TaskContext.get()
114118
taskContext.addTaskCompletionListener[Unit](_ => {
115119
store.release()
116120
})
121+
122+
// On task failure, we need to abort to roll back any uncommitted changes.
123+
// We use abort() instead of release() because:
124+
// 1. We want to roll back any uncommitted changes to maintain consistency
125+
// 2. abort() ensures proper cleanup and rollback of uncommitted changes
117126
taskContext.addTaskFailureListener(new TaskFailureListener {
118127
override def onTaskFailure(context: TaskContext, error: Throwable): Unit = {
119-
// On task failure, we need to abort to roll back any uncommitted changes
120-
// We don't call release() here because it would leave the state in an inconsistent state
121-
// abort() ensures proper cleanup and rollback of uncommitted changes
122128
store.abort()
123129
}
124130
})

0 commit comments

Comments
 (0)