Skip to content

Commit 2c62899

Browse files
neilramaswamycloud-fan
authored andcommitted
[SPARK-51083][CORE] Modify JavaUtils to not swallow InterruptedExceptions
### What changes were proposed in this pull request? These changes modify the `deleteRecursivelyUsingUnixNative` method in `JavaUtils.java` to not swallow `InterruptedException`s. The bulk of the changes in this PR relate to modifying the signatures of methods that directly or indirectly use `JavaUtils.deleteRecursivelyUsingUnixNative`. ### Why are the changes needed? `JavaUtils.deleteRecursively` swallows `InterruptedException`s, and that can cause entire executor loss. Consider a streaming task running a streaming aggregation. It takes the following steps: 1. It [writes](https://github.com/apache/spark/blob/1fc9d7d92ec60cc21e8fd54562e702ea67cf01d3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L720) all data to the state store, and then [reads](https://github.com/apache/spark/blob/1fc9d7d92ec60cc21e8fd54562e702ea67cf01d3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L732) this data back to emit it (in, say, Complete mode). 2. With the RocksDB state store, it doesn't actually acquire any locks, perform any sleeping, etc. during (1). This means that if the query is cancelled and it is interrupted, it won't respond to the interrupt. 3. After (1) is done, it [calls commit](https://github.com/apache/spark/blob/1fc9d7d92ec60cc21e8fd54562e702ea67cf01d3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L744) on the state store. 4. In RocksDB, this turns into a call to `RocksDB::commit`, which can call [createSnapshot](https://github.com/apache/spark/blob/1fc9d7d92ec60cc21e8fd54562e702ea67cf01d3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala#L879). 5. In createSnapshot, [we call Utils.deleteRecursively](https://github.com/apache/spark/blob/1fc9d7d92ec60cc21e8fd54562e702ea67cf01d3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala#L879) to make sure that the temporary directory into which we place the RocksDB checkpoint is empty. 6. In deleteRecursively, we [call deleteRecursivelyUsingUnixNative](https://github.com/apache/spark/blob/1fc9d7d92ec60cc21e8fd54562e702ea67cf01d3/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java#L115) which issues an `rm -rf` and then [calls process.waitFor](https://github.com/apache/spark/blob/1fc9d7d92ec60cc21e8fd54562e702ea67cf01d3/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java#L183), which declares that it could throw an InterruptedException. 7. Unfortunately, deleteRecursivelyUsingUnixNative [indiscriminately catches all Exceptions](https://github.com/apache/spark/blob/1fc9d7d92ec60cc21e8fd54562e702ea67cf01d3/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java#L184), which includes the `InterruptedException` potentially thrown by `process.waitFor`. Then, it rethrows an IOException. 8. That IOException is caught [here](https://github.com/apache/spark/blob/1fc9d7d92ec60cc21e8fd54562e702ea67cf01d3/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java#L117), and a warning is logged. The interrupt has now been swallowed. A streaming task thread that misses this interrupted exception will now not exit. If it doesn't exit for 60 seconds, then the TaskReaper will kill the JVM. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? This is fairly tricky to test; in fact, we don't exercise the code-path for `deleteRecursivelyUsingUnixNative` during testing on Apple Silicon, which made it hard for me. My approach here was to modify `deleteRecursivelyUsingUnixNative` to explicitly sleep. Then, I called that method in a background thread, interrupting it from the test thread. If all went correctly, then the background thread _should_ have received an interrupt that it did not swallow, and the test thread could assert that that happened. You can see this test in [this commit](95577db), but since it's an intrusive change, I have reverted it. Example output is: ``` [info] UtilsSuite: 1. Starting thread 2. Waiting for it to get to its sleep 3. Going to spawn rm -rf 4. Finished deleting directory 5. Sleeping for 10 seconds 6. Interrupting thread that is sleeping [info] - deleteRecursively throws InterruptedException (5 seconds, 18 milliseconds) 7. Catching and rethrowing interrupted exception 8. Thread exiting 9. gotInterruptedException = true ``` I validated that this test case I added _fails_ without my changes to fix the interrupt swallowing; you can check out that commit [here](70f2a7b). If you are a reviewer who can think of a way to create a test case we can check-in, please leave a review comment. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49796 from neilramaswamy/nr/spark-51083. Authored-by: Neil Ramaswamy <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 0af25b8 commit 2c62899

File tree

4 files changed

+16
-10
lines changed

4 files changed

+16
-10
lines changed

common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.io.File;
2121
import java.io.FileOutputStream;
22-
import java.io.IOException;
2322
import java.nio.ByteBuffer;
2423
import java.util.Random;
2524

@@ -85,7 +84,7 @@ void cleanup() {
8584
if (tempDir != null) {
8685
try {
8786
JavaUtils.deleteRecursively(tempDir);
88-
} catch (IOException io) {
87+
} catch (Exception io) {
8988
throw new RuntimeException(io);
9089
}
9190
}

common/network-common/src/test/java/org/apache/spark/network/util/DBProviderSuite.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,18 @@
3232
public class DBProviderSuite {
3333

3434
@Test
35-
public void testRockDBCheckVersionFailed() throws IOException {
35+
public void testRockDBCheckVersionFailed() throws IOException, InterruptedException {
3636
testCheckVersionFailed(DBBackend.ROCKSDB, "rocksdb");
3737
}
3838

3939
@Test
40-
public void testLevelDBCheckVersionFailed() throws IOException {
40+
public void testLevelDBCheckVersionFailed() throws IOException, InterruptedException {
4141
assumeFalse(SystemUtils.IS_OS_MAC_OSX && SystemUtils.OS_ARCH.equals("aarch64"));
4242
testCheckVersionFailed(DBBackend.LEVELDB, "leveldb");
4343
}
4444

45-
private void testCheckVersionFailed(DBBackend dbBackend, String namePrefix) throws IOException {
45+
private void testCheckVersionFailed(DBBackend dbBackend, String namePrefix)
46+
throws IOException, InterruptedException {
4647
String root = System.getProperty("java.io.tmpdir");
4748
File dbFile = JavaUtils.createDirectory(root, namePrefix);
4849
try {

common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public void cleanup() {
6363
for (String localDir : localDirs) {
6464
try {
6565
JavaUtils.deleteRecursively(new File(localDir));
66-
} catch (IOException e) {
66+
} catch (Exception e) {
6767
logger.warn("Unable to cleanup localDir = " + localDir, e);
6868
}
6969
}

common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public static String bytesToString(ByteBuffer b) {
9090
* @param file Input file / dir to be deleted
9191
* @throws IOException if deletion is unsuccessful
9292
*/
93-
public static void deleteRecursively(File file) throws IOException {
93+
public static void deleteRecursively(File file) throws IOException, InterruptedException {
9494
deleteRecursively(file, null);
9595
}
9696

@@ -103,7 +103,8 @@ public static void deleteRecursively(File file) throws IOException {
103103
* are deleted.
104104
* @throws IOException if deletion is unsuccessful
105105
*/
106-
public static void deleteRecursively(File file, FilenameFilter filter) throws IOException {
106+
public static void deleteRecursively(File file, FilenameFilter filter)
107+
throws IOException, InterruptedException {
107108
if (file == null) { return; }
108109

109110
// On Unix systems, use operating system command to run faster
@@ -125,7 +126,7 @@ public static void deleteRecursively(File file, FilenameFilter filter) throws IO
125126

126127
private static void deleteRecursivelyUsingJavaIO(
127128
File file,
128-
FilenameFilter filter) throws IOException {
129+
FilenameFilter filter) throws IOException, InterruptedException {
129130
BasicFileAttributes fileAttributes = readFileAttributes(file);
130131
// SPARK-50716: If the file attributes are null, that is, the file attributes cannot be read,
131132
// or if the file does not exist and is not a broken symbolic link, then return directly.
@@ -168,7 +169,8 @@ private static BasicFileAttributes readFileAttributes(File file) {
168169
}
169170
}
170171

171-
private static void deleteRecursivelyUsingUnixNative(File file) throws IOException {
172+
private static void deleteRecursivelyUsingUnixNative(File file)
173+
throws InterruptedException, IOException {
172174
ProcessBuilder builder = new ProcessBuilder("rm", "-rf", file.getAbsolutePath());
173175
Process process = null;
174176
int exitCode = -1;
@@ -181,6 +183,10 @@ private static void deleteRecursivelyUsingUnixNative(File file) throws IOExcepti
181183
process = builder.start();
182184

183185
exitCode = process.waitFor();
186+
} catch (InterruptedException e) {
187+
// SPARK-51083: Specifically rethrow InterruptedException if it occurs, since swallowing
188+
// it will lead to tasks missing cancellation.
189+
throw e;
184190
} catch (Exception e) {
185191
throw new IOException("Failed to delete: " + file.getAbsolutePath(), e);
186192
} finally {

0 commit comments

Comments
 (0)