Skip to content

Commit ab357d1

Browse files
andygroveclaude
andcommitted
fix: fall back scan when plan uses input_file_name expressions
CometScanExec does not populate InputFileBlockHolder (the thread-local that Spark's FileScanRDD sets), so input_file_name(), input_file_block_start(), and input_file_block_length() return empty or default values when Comet replaces the scan. Detect these expressions in the plan and fall back to Spark's FileSourceScanExec. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 188cd86 commit ab357d1

File tree

1 file changed

+22
-5
lines changed

1 file changed

+22
-5
lines changed

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import scala.jdk.CollectionConverters._
2828
import org.apache.hadoop.conf.Configuration
2929
import org.apache.spark.internal.Logging
3030
import org.apache.spark.sql.SparkSession
31-
import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, GenericInternalRow, PlanExpression}
31+
import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, GenericInternalRow, InputFileBlockLength, InputFileBlockStart, InputFileName, PlanExpression}
3232
import org.apache.spark.sql.catalyst.rules.Rule
3333
import org.apache.spark.sql.catalyst.util.{sideBySide, ArrayBasedMapData, GenericArrayData, MetadataColumnHelper}
3434
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues
@@ -110,7 +110,9 @@ case class CometScanRule(session: SparkSession)
110110
metadataTableSuffix.exists(suffix => scanExec.table.name().endsWith(suffix))
111111
}
112112

113-
def transformScan(plan: SparkPlan): SparkPlan = plan match {
113+
val fullPlan = plan
114+
115+
def transformScan(scanNode: SparkPlan): SparkPlan = scanNode match {
114116
case scan if !CometConf.COMET_NATIVE_SCAN_ENABLED.get(conf) =>
115117
withInfo(scan, "Comet Scan is not enabled")
116118

@@ -119,7 +121,7 @@ case class CometScanRule(session: SparkSession)
119121

120122
// data source V1
121123
case scanExec: FileSourceScanExec =>
122-
transformV1Scan(scanExec)
124+
transformV1Scan(fullPlan, scanExec)
123125

124126
// data source V2
125127
case scanExec: BatchScanExec =>
@@ -135,7 +137,7 @@ case class CometScanRule(session: SparkSession)
135137
}
136138
}
137139

138-
private def transformV1Scan(scanExec: FileSourceScanExec): SparkPlan = {
140+
private def transformV1Scan(plan: SparkPlan, scanExec: FileSourceScanExec): SparkPlan = {
139141

140142
if (COMET_DPP_FALLBACK_ENABLED.get() &&
141143
scanExec.partitionFilters.exists(isDynamicPruningFilter)) {
@@ -170,7 +172,7 @@ case class CometScanRule(session: SparkSession)
170172
nativeIcebergCompatScan(session, scanExec, r, hadoopConf)
171173
.getOrElse(scanExec)
172174
case SCAN_NATIVE_DATAFUSION =>
173-
nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
175+
nativeDataFusionScan(plan, session, scanExec, r, hadoopConf).getOrElse(scanExec)
174176
case SCAN_NATIVE_ICEBERG_COMPAT =>
175177
nativeIcebergCompatScan(session, scanExec, r, hadoopConf).getOrElse(scanExec)
176178
}
@@ -181,6 +183,7 @@ case class CometScanRule(session: SparkSession)
181183
}
182184

183185
private def nativeDataFusionScan(
186+
plan: SparkPlan,
184187
session: SparkSession,
185188
scanExec: FileSourceScanExec,
186189
r: HadoopFsRelation,
@@ -196,6 +199,20 @@ case class CometScanRule(session: SparkSession)
196199
withInfo(scanExec, "Native DataFusion scan does not support metadata columns")
197200
return None
198201
}
202+
// input_file_name, input_file_block_start, and input_file_block_length read from
203+
// InputFileBlockHolder, a thread-local set by Spark's FileScanRDD. The native DataFusion
204+
// scan does not use FileScanRDD, so these expressions would return empty/default values.
205+
if (plan.exists(node =>
206+
node.expressions.exists(_.exists {
207+
case _: InputFileName | _: InputFileBlockStart | _: InputFileBlockLength => true
208+
case _ => false
209+
}))) {
210+
withInfo(
211+
scanExec,
212+
"Native DataFusion scan is not compatible with input_file_name, " +
213+
"input_file_block_start, or input_file_block_length")
214+
return None
215+
}
199216
if (ShimFileFormat.findRowIndexColumnIndexInSchema(scanExec.requiredSchema) >= 0) {
200217
withInfo(scanExec, "Native DataFusion scan does not support row index generation")
201218
return None

0 commit comments

Comments
 (0)