Skip to content

[GLUTEN-9801] Clean up Write Files commit Protocol logic #9844

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

JkSelf
Copy link
Contributor

@JkSelf JkSelf commented Jun 3, 2025

What changes were proposed in this pull request?

Velox supports third-party integration for passing the fileName, eliminating the need to define SparkWriteFilesCommitProtocol in Gluten for managing the write task commit process. Instead, Spark's FileCommitProtocol can be used directly.

With this PR, the file written by the failed task can be deleted in abortTask().

Fix #9801 (comment)

How was this patch tested?

Existing unit tests

@github-actions github-actions bot added the VELOX label Jun 3, 2025
Copy link

github-actions bot commented Jun 3, 2025

#9801

@JkSelf
Copy link
Contributor Author

JkSelf commented Jun 3, 2025

@ulysses-you Can you help to review this PR? Thanks.

@JkSelf JkSelf force-pushed the delete-failed-task branch from 40f2dd8 to a538513 Compare June 3, 2025 05:13
@PHILO-HE
Copy link
Contributor

PHILO-HE commented Jun 3, 2025

@RushabhK, could you have a try again?

@RushabhK
Copy link

RushabhK commented Jun 3, 2025

@RushabhK, could you have a try again?

Sure @PHILO-HE , will test out this change!

@FelixYBW
Copy link
Contributor

FelixYBW commented Jun 3, 2025

@JkSelf just curious, why we can't let spark handle this for Gluten?

@@ -119,6 +122,8 @@ class VeloxColumnarWriteFilesRDD(
val targetFileName = fileWriteInfo.targetFileName
val outputPath = description.path

fileNames += targetFileName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems writePath does not contain partition partitionFragment ? we should collect partitionFragment + "/" + targetFileName

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ulysses-you Yes. Updated.

@JkSelf
Copy link
Contributor Author

JkSelf commented Jun 4, 2025

@JkSelf just curious, why we can't let spark handle this for Gluten?

@FelixYBW Velox creates the file name and partition path during the file writing process, which prevents Gluten from directly using the HadoopMapReduceCommitProtocol offered by Spark to handle task commit operations.

@JkSelf JkSelf force-pushed the delete-failed-task branch from a538513 to 5a8d954 Compare June 4, 2025 06:34
@RushabhK
Copy link

RushabhK commented Jun 5, 2025

@JkSelf I tested this change on my setup. It's still giving the same exception, is not a Parquet file. Expected magic number at tail, but found [2, 0, 0, 0]. This file is ~250 MB size.

This is the complete stack trace:

Py4JJavaError: An error occurred while calling o135.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1161 in stage 1.0 failed 4 times, most recent failure: Lost task 1161.3 in stage 1.0 (TID 1208) (241.130.178.8 executor 2): java.lang.RuntimeException: gs://<some_path>/gluten-part-d0a3b6a4-ccc9-41b3-a44e-34177ab18674.zstd.parquet is not a Parquet file. Expected magic number at tail, but found [2, 0, 0, 0]
	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:565)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:799)
	at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:666)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:85)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:71)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:66)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:213)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:593)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.RuntimeException: gs://<some_path>/gluten-part-d0a3b6a4-ccc9-41b3-a44e-34177ab18674.zstd.parquet is not a Parquet file. Expected magic number at tail, but found [2, 0, 0, 0]
	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:565)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:799)
	at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:666)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:85)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:71)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:66)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:213)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:593)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
[Stage 1:======================================>              (1168 + 4) / 1627]

@JkSelf
Copy link
Contributor Author

JkSelf commented Jun 5, 2025

@JkSelf I tested this change on my setup. It's still giving the same exception, is not a Parquet file. Expected magic number at tail, but found [2, 0, 0, 0]. This file is ~250 MB size.

This is the complete stack trace:

Py4JJavaError: An error occurred while calling o135.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1161 in stage 1.0 failed 4 times, most recent failure: Lost task 1161.3 in stage 1.0 (TID 1208) (241.130.178.8 executor 2): java.lang.RuntimeException: gs://<some_path>/gluten-part-d0a3b6a4-ccc9-41b3-a44e-34177ab18674.zstd.parquet is not a Parquet file. Expected magic number at tail, but found [2, 0, 0, 0]
	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:565)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:799)
	at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:666)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:85)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:71)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:66)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:213)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:593)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.RuntimeException: gs://<some_path>/gluten-part-d0a3b6a4-ccc9-41b3-a44e-34177ab18674.zstd.parquet is not a Parquet file. Expected magic number at tail, but found [2, 0, 0, 0]
	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:565)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:799)
	at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:666)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:85)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:71)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:66)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:213)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:593)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
[Stage 1:======================================>              (1168 + 4) / 1627]

@RushabhK Ok. Can you help to provide the reproduced code? Thanks.

@RushabhK
Copy link

RushabhK commented Jun 5, 2025

@JkSelf I tested this change on my setup. It's still giving the same exception, is not a Parquet file. Expected magic number at tail, but found [2, 0, 0, 0]. This file is ~250 MB size.
This is the complete stack trace:

Py4JJavaError: An error occurred while calling o135.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1161 in stage 1.0 failed 4 times, most recent failure: Lost task 1161.3 in stage 1.0 (TID 1208) (241.130.178.8 executor 2): java.lang.RuntimeException: gs://<some_path>/gluten-part-d0a3b6a4-ccc9-41b3-a44e-34177ab18674.zstd.parquet is not a Parquet file. Expected magic number at tail, but found [2, 0, 0, 0]
	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:565)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:799)
	at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:666)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:85)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:71)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:66)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:213)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:593)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.RuntimeException: gs://<some_path>/gluten-part-d0a3b6a4-ccc9-41b3-a44e-34177ab18674.zstd.parquet is not a Parquet file. Expected magic number at tail, but found [2, 0, 0, 0]
	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:565)
	at org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:799)
	at org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:666)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:85)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:71)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:66)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:213)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:593)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
[Stage 1:======================================>              (1168 + 4) / 1627]

@RushabhK Ok. Can you help to provide the reproduced code? Thanks.

@JkSelf I can elaborate how I am testing this in the following steps

  1. I took the gluten build with these changes, built my new spark image
  2. I have a spark job which writes parquet with 300 tasks, 8 core per executor is the config.
  3. While it is writing from the 300 tasks, I kill one of the executors (8 failed tasks), it retries and then it finishes.
  4. I then try reading the parquet files and just do a df.count() on it for it to materialize. This is when I encounter the following exception.

image

@JkSelf
Copy link
Contributor Author

JkSelf commented Jun 6, 2025

@RushabhK I followed the steps you provided using the main branch code, and the test code is as follows:

val store_sales = spark.read.format("parquet").load("/user/root/tpcds_parquet_partition__100/store_sales")

store_sales.write.mode("append").format("parquet").save("/user/root/parquet-write")

val velox_store_sales = spark.read.format("parquet").load("/user/root/parquet-write/")
velox_store_sales.count()

Here is the Spark history UI for reference.

image

However, I was unable to reproduce the issue you mentioned. Am I missing something?

@RushabhK
Copy link

RushabhK commented Jun 6, 2025

@RushabhK I followed the steps you provided using the main branch code, and the test code is as follows:

val store_sales = spark.read.format("parquet").load("/user/root/tpcds_parquet_partition__100/store_sales")

store_sales.write.mode("append").format("parquet").save("/user/root/parquet-write")

val velox_store_sales = spark.read.format("parquet").load("/user/root/parquet-write/")
velox_store_sales.count()

Here is the Spark history UI for reference.

image

However, I was unable to reproduce the issue you mentioned. Am I missing something?

@JkSelf I am running this on Ubuntu, what OS are you running it on? Can you share the setup for your spark image?
The setup looks right, although I am applying this path on v1.3.0 branch and not on main.. Are there any fixes on main branch related to this?

@RushabhK
Copy link

RushabhK commented Jun 6, 2025

@RushabhK I followed the steps you provided using the main branch code, and the test code is as follows:

val store_sales = spark.read.format("parquet").load("/user/root/tpcds_parquet_partition__100/store_sales")

store_sales.write.mode("append").format("parquet").save("/user/root/parquet-write")

val velox_store_sales = spark.read.format("parquet").load("/user/root/parquet-write/")
velox_store_sales.count()

Here is the Spark history UI for reference.
image
However, I was unable to reproduce the issue you mentioned. Am I missing something?

@JkSelf I am running this on Ubuntu, what OS are you running it on? Can you share the setup for your spark image? The setup looks right, although I am applying this path on v1.3.0 branch and not on main.. Are there any fixes on main branch related to this?

Also @JkSelf I am writing the parquet in overwrite mode. I am using the following code for write:

df.coalesce(300).write.partitionBy("date_key", "hour").
      format("parquet").mode("overwrite").option("compression", "zstd").
      option("partitionOverwriteMode", "dynamic").
      save("table_name")

@RushabhK
Copy link

RushabhK commented Jun 9, 2025

@JkSelf I added some logs for better visibility around what all files the abortTask is deleting.
I can see in all reproducing scenarios that the abortTask is always having 0 files: https://github.com/RushabhK/incubator-gluten/blob/v1.3.0-fixes/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala#L104
Sample log:

ERROR SparkWriteFilesCommitProtocol: Filenames info: 0 files, file names: 
ERROR SparkWriteFilesCommitProtocol: Filenames info: 0 files, file names: 

So is the code in the catchBlock suggests 0 files: https://github.com/RushabhK/incubator-gluten/blob/v1.3.0-fixes/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala#L244
Sample log:

ERROR VeloxColumnarWriteFilesRDD: Commit failed, aborting task. fileNames size: 0Deleting staging files
ERROR VeloxColumnarWriteFilesRDD: Commit failed, aborting task. fileNames size: 0Deleting staging files

I had added more logs to check the fileNames status at every point. This suggest the fileNames size to be 1 in all the logs: https://github.com/RushabhK/incubator-gluten/blob/v1.3.0-fixes/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala#L139
Sample logs:

ERROR VeloxColumnarWriteFilesRDD: Current filenames size: 1, filenames: date_key=2025-05-26/hour=00/gluten-part-b37dc941-ec8f-4a26-a189-0b9119014c8b.zstd.parquet
ERROR VeloxColumnarWriteFilesRDD: Current filenames size: 1, filenames: date_key=2025-05-26/hour=00/gluten-part-c924ab72-003b-4ae0-9f90-54695ce851d4.zstd.parquet
ERROR VeloxColumnarWriteFilesRDD: Current filenames size: 1, filenames: date_key=2025-05-26/hour=00/gluten-part-6cac7566-982f-486a-b918-e2c0e2bed5a2.zstd.parquet
ERROR VeloxColumnarWriteFilesRDD: Current filenames size: 1, filenames: date_key=2025-05-26/hour=00/gluten-part-6cac7566-982f-486a-b918-e2c0e2bed5a2.zstd.parquet

The problem is 0 files are being collected while calling the abortTask. This is the issue which needs to be addressed, this is why it's not deleting any files when the abort task is being called.
@JkSelf @FelixYBW Could you think of any possible reasons for this? Do let me know what all steps / logs I can add for us to be able to troubleshoot this further.

@JkSelf
Copy link
Contributor Author

JkSelf commented Jun 9, 2025

@RushabhK My OS is Ubuntu 20.04.6 and spark version is 3.5.2.
I also tried the following code and still cannot reproduce.

store_sales.coalesce(300).write.partitionBy("ss_sold_date_sk").mode("overwrite").option("compression", "zstd").option("partitionOverwriteMode", "dynamic").format("parquet").save("/user/root/parquet-write")

@JkSelf
Copy link
Contributor Author

JkSelf commented Jun 9, 2025

@JkSelf I added some logs for better visibility around what all files the abortTask is deleting. I can see in all reproducing scenarios that the abortTask is always having 0 files: https://github.com/RushabhK/incubator-gluten/blob/v1.3.0-fixes/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala#L104 Sample log:

ERROR SparkWriteFilesCommitProtocol: Filenames info: 0 files, file names: 
ERROR SparkWriteFilesCommitProtocol: Filenames info: 0 files, file names: 

So is the code in the catchBlock suggests 0 files: https://github.com/RushabhK/incubator-gluten/blob/v1.3.0-fixes/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala#L244 Sample log:

ERROR VeloxColumnarWriteFilesRDD: Commit failed, aborting task. fileNames size: 0Deleting staging files
ERROR VeloxColumnarWriteFilesRDD: Commit failed, aborting task. fileNames size: 0Deleting staging files

I had added more logs to check the fileNames status at every point. This suggest the fileNames size to be 1 in all the logs: https://github.com/RushabhK/incubator-gluten/blob/v1.3.0-fixes/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala#L139 Sample logs:

ERROR VeloxColumnarWriteFilesRDD: Current filenames size: 1, filenames: date_key=2025-05-26/hour=00/gluten-part-b37dc941-ec8f-4a26-a189-0b9119014c8b.zstd.parquet
ERROR VeloxColumnarWriteFilesRDD: Current filenames size: 1, filenames: date_key=2025-05-26/hour=00/gluten-part-c924ab72-003b-4ae0-9f90-54695ce851d4.zstd.parquet
ERROR VeloxColumnarWriteFilesRDD: Current filenames size: 1, filenames: date_key=2025-05-26/hour=00/gluten-part-6cac7566-982f-486a-b918-e2c0e2bed5a2.zstd.parquet
ERROR VeloxColumnarWriteFilesRDD: Current filenames size: 1, filenames: date_key=2025-05-26/hour=00/gluten-part-6cac7566-982f-486a-b918-e2c0e2bed5a2.zstd.parquet

The problem is 0 files are being collected while calling the abortTask. This is the issue which needs to be addressed, this is why it's not deleting any files when the abort task is being called. @JkSelf @FelixYBW Could you think of any possible reasons for this? Do let me know what all steps / logs I can add for us to be able to troubleshoot this further.

Are you saying that fileNames is populated when collectNativeWriteFilesMetrics is called, but is empty when abortTask is invoked?

@RushabhK
Copy link

RushabhK commented Jun 9, 2025

@JkSelf I added some logs for better visibility around what all files the abortTask is deleting. I can see in all reproducing scenarios that the abortTask is always having 0 files: https://github.com/RushabhK/incubator-gluten/blob/v1.3.0-fixes/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala#L104 Sample log:

ERROR SparkWriteFilesCommitProtocol: Filenames info: 0 files, file names: 
ERROR SparkWriteFilesCommitProtocol: Filenames info: 0 files, file names: 

So is the code in the catchBlock suggests 0 files: https://github.com/RushabhK/incubator-gluten/blob/v1.3.0-fixes/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala#L244 Sample log:

ERROR VeloxColumnarWriteFilesRDD: Commit failed, aborting task. fileNames size: 0Deleting staging files
ERROR VeloxColumnarWriteFilesRDD: Commit failed, aborting task. fileNames size: 0Deleting staging files

I had added more logs to check the fileNames status at every point. This suggest the fileNames size to be 1 in all the logs: https://github.com/RushabhK/incubator-gluten/blob/v1.3.0-fixes/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala#L139 Sample logs:

ERROR VeloxColumnarWriteFilesRDD: Current filenames size: 1, filenames: date_key=2025-05-26/hour=00/gluten-part-b37dc941-ec8f-4a26-a189-0b9119014c8b.zstd.parquet
ERROR VeloxColumnarWriteFilesRDD: Current filenames size: 1, filenames: date_key=2025-05-26/hour=00/gluten-part-c924ab72-003b-4ae0-9f90-54695ce851d4.zstd.parquet
ERROR VeloxColumnarWriteFilesRDD: Current filenames size: 1, filenames: date_key=2025-05-26/hour=00/gluten-part-6cac7566-982f-486a-b918-e2c0e2bed5a2.zstd.parquet
ERROR VeloxColumnarWriteFilesRDD: Current filenames size: 1, filenames: date_key=2025-05-26/hour=00/gluten-part-6cac7566-982f-486a-b918-e2c0e2bed5a2.zstd.parquet

The problem is 0 files are being collected while calling the abortTask. This is the issue which needs to be addressed, this is why it's not deleting any files when the abort task is being called. @JkSelf @FelixYBW Could you think of any possible reasons for this? Do let me know what all steps / logs I can add for us to be able to troubleshoot this further.

Are you saying that fileNames is populated when collectNativeWriteFilesMetrics is called, but is empty when abortTask is invoked?

Yes, fileNames is empty when abortTask is invoked. I also tried maintaining localFileNames inside compute to ensure it's not a scoping issue. But even that is showing similar behavior.

@JkSelf JkSelf force-pushed the delete-failed-task branch from 5a8d954 to fb9ec0c Compare June 17, 2025 05:57
@github-actions github-actions bot added the BUILD label Jun 17, 2025
@JkSelf JkSelf force-pushed the delete-failed-task branch from fb9ec0c to e14720b Compare July 9, 2025 07:50
@github-actions github-actions bot removed the BUILD label Jul 9, 2025
@JkSelf JkSelf force-pushed the delete-failed-task branch from e14720b to 663107b Compare July 9, 2025 07:57
@JkSelf
Copy link
Contributor Author

JkSelf commented Jul 9, 2025

@RushabhK Could you help to try the latest commit again? Thanks.

@JkSelf JkSelf force-pushed the delete-failed-task branch 2 times, most recently from 8694073 to 7ce2fe2 Compare July 17, 2025 02:12
@JkSelf JkSelf changed the title [GLUTEN-9801] Only delete the files created by the failed task [GLUTEN-9801] Clean up Write Files commit Protocol logic Jul 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[VL] Native parquet writer writing a corrupt / invalid parquet file on spark task failure
5 participants