Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark rewrite_data_files failing with java.lang.IllegalStateException: Connection pool shut down #12046

Open
1 of 3 tasks
mgmarino opened this issue Jan 22, 2025 · 4 comments
Open
1 of 3 tasks
Labels
bug Something isn't working

Comments

@mgmarino
Copy link

Apache Iceberg version

1.7.1 (latest release)

Query engine

Spark

Please describe the bug 🐞

We are running a maintenance job to rewrite data files (in parallel) on AWS Glue, calling the rewrite_data_files procedure like the following:

        sc.sql(
            f"""CALL iceberg.system.rewrite_data_files(
                table => 'dest_table',
                strategy => 'sort',
                options => map(
                    'max-concurrent-file-group-rewrites', '44',
                    'rewrite-job-order', 'bytes-desc',
                    'partial-progress.enabled', 'true'
                ),
                where => 'ts >= timestamp \"{rewrite_begin}\" AND ts < timestamp \"{rewrite_end}\"'
            )
            """
        )

We are getting errors like the following:

java.lang.IllegalStateException: Connection pool shut down
	at org.apache.iceberg.aws.shaded.org.apache.http.util.Asserts.check(Asserts.java:34)
	at org.apache.iceberg.aws.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:269)
	at software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:75)
	at software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory$InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:57)
	at org.apache.iceberg.aws.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176)
	at org.apache.iceberg.aws.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
	at org.apache.iceberg.aws.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
	at org.apache.iceberg.aws.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
	at org.apache.iceberg.aws.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
	at software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:72)
	at software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:254)
	at software.amazon.awssdk.http.apache.ApacheHttpClient.access$500(ApacheHttpClient.java:104)
	at software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:231)
	at software.amazon.awssdk.http.apache.ApacheHttpClient$1.call(ApacheHttpClient.java:228)
	at software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:99)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:79)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:57)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:40)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:74)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:43)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:79)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:41)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.executeRequest(RetryableStage2.java:93)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.execute(RetryableStage2.java:56)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage2.execute(RetryableStage2.java:36)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:53)
	at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:35)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:82)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:62)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:43)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
	at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:210)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:173)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$0(BaseSyncClientHandler.java:66)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:182)
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:60)
	at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:52)
	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:60)
	at software.amazon.awssdk.services.s3.DefaultS3Client.getObject(DefaultS3Client.java:5404)
	at org.apache.iceberg.aws.s3.S3InputStream.openStream(S3InputStream.java:240)
	at org.apache.iceberg.aws.s3.S3InputStream.openStream(S3InputStream.java:225)
	at org.apache.iceberg.aws.s3.S3InputStream.positionStream(S3InputStream.java:221)
	at org.apache.iceberg.aws.s3.S3InputStream.read(S3InputStream.java:122)
	at org.apache.iceberg.shaded.org.apache.parquet.io.DelegatingSeekableInputStream.read(DelegatingSeekableInputStream.java:61)
	at org.apache.iceberg.shaded.org.apache.parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:83)
	at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:556)
	at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.(ParquetFileReader.java:799)
	at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:666)
	at org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:238)
	at org.apache.iceberg.parquet.ReadConf.(ReadConf.java:81)
	at org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:71)
	at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:91)
	at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:37)
	at org.apache.iceberg.util.Filter.lambda$filter$0(Filter.java:34)
	at org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:72)
	at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:95)
	at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:43)
	at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:143)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
	at scala.Option.exists(Option.scala:376)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:57)
	at org.apache.spark.RangePartitioner$.$anonfun$sketch$1(Partitioner.scala:369)
	at org.apache.spark.RangePartitioner$.$anonfun$sketch$1$adapted(Partitioner.scala:367)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:911)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:911)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:61)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:368)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:174)
	at org.apache.spark.scheduler.Task.run(Task.scala:152)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:632)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:635)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

A few points:

  • We saw this error in both Glue 4.0 (Spark 3.3.) and Glue 5.0 (with Iceberg 1.6 and 1.7.1), but it has increased significantly with the newer versions of Glue 5.0 (Spark 3.5.2). Thinking that it might be due to Revert "Hive: close the fileIO client when closing the hive catalog" #11858, I built a version of Iceberg using the 1.7.x branch with the recent changes, but the error still remained.
  • The error leads to the rewriting not succeeding completely (though some groups do succeed and are committed, most are not), but the Spark job never actually fails, which actually meant it took a while for us to catch.
  • It seems to happen with the "second" set of tasks, see:
Image

suggesting to me the lifecycle of this pool connection is simply not working correctly.

I am happy to try and provide some additional information here and help for a fix, but I'd need some guidance how to do this.

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@mgmarino mgmarino added the bug Something isn't working label Jan 22, 2025
@mgmarino
Copy link
Author

mgmarino commented Jan 23, 2025

I tried to trace where the connection pool is being closed. Aside from a calls stemming from finalizers on Thread shutdown (which seem perfectly legitimate), I see:

ERROR PoolingHttpClientConnectionManager: Shutting down Pool: 
java.lang.Exception: shutting down
	at org.apache.iceberg.aws.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.shutdown(PoolingHttpClientConnectionManager.java:410) ~[custom-jar-glue-job-680fb4e.jar:?]
	at software.amazon.awssdk.http.apache.ApacheHttpClient.close(ApacheHttpClient.java:247) ~[custom-jar-glue-job-680fb4e.jar:?]
	at software.amazon.awssdk.utils.IoUtils.closeQuietly(IoUtils.java:70) ~[custom-jar-glue-job-680fb4e.jar:?]
	at software.amazon.awssdk.utils.IoUtils.closeIfCloseable(IoUtils.java:87) ~[custom-jar-glue-job-680fb4e.jar:?]
	at software.amazon.awssdk.utils.AttributeMap.closeIfPossible(AttributeMap.java:678) ~[custom-jar-glue-job-680fb4e.jar:?]
	at software.amazon.awssdk.utils.AttributeMap.access$1600(AttributeMap.java:49) ~[custom-jar-glue-job-680fb4e.jar:?]
	at software.amazon.awssdk.utils.AttributeMap$DerivedValue.close(AttributeMap.java:632) ~[custom-jar-glue-job-680fb4e.jar:?]
	at java.util.HashMap$Values.forEach(HashMap.java:1065) ~[?:?]
	at software.amazon.awssdk.utils.AttributeMap.close(AttributeMap.java:107) ~[custom-jar-glue-job-680fb4e.jar:?]
	at software.amazon.awssdk.core.client.config.SdkClientConfiguration.close(SdkClientConfiguration.java:118) ~[custom-jar-glue-job-680fb4e.jar:?]
	at software.amazon.awssdk.core.internal.http.HttpClientDependencies.close(HttpClientDependencies.java:82) ~[custom-jar-glue-job-680fb4e.jar:?]
	at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient.close(AmazonSyncHttpClient.java:76) ~[custom-jar-glue-job-680fb4e.jar:?]
	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.close(BaseSyncClientHandler.java:86) ~[custom-jar-glue-job-680fb4e.jar:?]
	at software.amazon.awssdk.services.s3.DefaultS3Client.close(DefaultS3Client.java:12477) ~[custom-jar-glue-job-680fb4e.jar:?]
	at org.apache.iceberg.aws.s3.S3FileIO.close(S3FileIO.java:417) ~[custom-jar-glue-job-680fb4e.jar:?]
	at org.apache.iceberg.spark.source.SerializableTableWithSize.close(SerializableTableWithSize.java:69) ~[custom-jar-glue-job-680fb4e.jar:?]
	at org.apache.spark.storage.memory.MemoryStore.$anonfun$freeMemoryEntry$1(MemoryStore.scala:410) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.storage.memory.MemoryStore.$anonfun$freeMemoryEntry$1$adapted(MemoryStore.scala:407) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) ~[scala-library-2.12.18.jar:?]
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) ~[scala-library-2.12.18.jar:?]
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) ~[scala-library-2.12.18.jar:?]
	at org.apache.spark.storage.memory.MemoryStore.freeMemoryEntry(MemoryStore.scala:407) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.storage.memory.MemoryStore.remove(MemoryStore.scala:425) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:2012) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.storage.memory.MemoryStore.dropBlock$1(MemoryStore.scala:503) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.storage.memory.MemoryStore.$anonfun$evictBlocksToFreeSpace$4(MemoryStore.scala:529) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) ~[scala-library-2.12.18.jar:?]
	at org.apache.spark.storage.memory.MemoryStore.evictBlocksToFreeSpace(MemoryStore.scala:520) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.memory.StorageMemoryPool.acquireMemory(StorageMemoryPool.scala:93) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.memory.StorageMemoryPool.acquireMemory(StorageMemoryPool.scala:74) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.memory.UnifiedMemoryManager.acquireStorageMemory(UnifiedMemoryManager.scala:181) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.storage.memory.MemoryStore.putBytes(MemoryStore.scala:151) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.storage.BlockManager$BlockStoreUpdater.saveSerializedValuesToMemoryStore(BlockManager.scala:363) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.storage.BlockManager$BlockStoreUpdater.$anonfun$save$1(BlockManager.scala:404) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1540) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:384) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:1484) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBlocks$1(TorrentBroadcast.scala:240) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23) ~[scala-library-2.12.18.jar:?]
	at scala.collection.immutable.List.foreach(List.scala:431) ~[scala-library-2.12.18.jar:?]
	at org.apache.spark.broadcast.TorrentBroadcast.readBlocks(TorrentBroadcast.scala:212) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:308) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at scala.Option.getOrElse(Option.scala:189) ~[scala-library-2.12.18.jar:?]
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:284) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:279) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.util.SparkErrorUtils.tryOrIOException(SparkErrorUtils.scala:35) ~[spark-common-utils_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.util.SparkErrorUtils.tryOrIOException$(SparkErrorUtils.scala:33) ~[spark-common-utils_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:96) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:279) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:125) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:77) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:174) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.scheduler.Task.run(Task.scala:152) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:632) ~[spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) [spark-common-utils_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) [spark-common-utils_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96) [spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:635) [spark-core_2.12-3.5.2-amzn-1.jar:3.5.2-amzn-1]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
	at java.lang.Thread.run(Thread.java:840) [?:?]

Where I would pick out the relevant line:

at org.apache.iceberg.spark.source.SerializableTableWithSize.close(SerializableTableWithSize.java:69)

My suspicion is that that this IO object (created/obtained e.g. here, I believe:

) is shared with the reader. (EDIT: Indeed this behavior is mentioned here: #11633 (comment))

Since we are using the Glue catalog, I believe this IO object will likely come all the way from GlueTableOperations

I am not completely familiar with the internals of Spark here, but it looks to me like this is basically trying to free up memory because it is possibly running up against some limits. As such, I could imagine this would really only happen in very particular cases. For us, this could also explain why we saw this sometimes with Glue 4.0, and now more often with Glue 5.0, because the behavior wrt memory could've changed between versions.

@mgmarino
Copy link
Author

Ok, I can confirm that commenting out the code:

allows the job to run to completion.

@mgmarino
Copy link
Author

Just for documentation, something similar seems to have been discussed here when SerializableTableWithSize was made closeable.:

#8685 (comment)

@mgmarino
Copy link
Author

mgmarino commented Jan 24, 2025

After doing some further investigation, my initial conclusion is the following:

  • I can see SerializableTableWithSize being generated on the driver at least in two different places:
  • If these jobs get submitted to the same Executor, on deserialization they will still point to the same IO object, meaning that when one gets cleaned up (and closed), it will affect the other. This is leading to the closure of the Pool object which then causing the illegal state exception when the subsequent task tries to access it again.

I am not sure what a good solution is here, but I suspect that the FileIO may need to be copied when creating the serializable table instead of what is done now:

Would love to get some input here!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant